Skip to content

Conversation

@huanliwang-db
Copy link
Contributor

@huanliwang-db huanliwang-db commented May 3, 2024

What changes were proposed in this pull request?

  • When we close the hdfs state store, we should only remove the entry from loadedMaps rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
  • we should wait for the maintenance thread to stop before unloading the providers.

Why are the changes needed?

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:

  1. the maintenance thread pool encounters some issues and call the stopMaintenanceTask, this function further calls threadPool.stop. However, this function doesn't wait for the stop operation to be completed and move to do the state store unload and clear.
  2. the provider unload will close the state store which clear the values of loadedMaps for HDFS backed state store.
  3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying HDFSBackedStateStoreMap has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

  1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the HDFSBackedStateStoreMap object from the loadedMaps, after this, the maintenance thread releases the lock of the loadedMaps.
  2. state_store_1 is loaded in another executor, e.g. executor_2.
  3. another state store, state_store_2, is loaded on executor_1 and reportActiveStoreInstance to driver.
  4. executor_1 does the unload for those no longer active state store which clears the data entries in the HDFSBackedStateStoreMap
  5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the iterator doesn't have next element after doing the clear.
  6. future batches are consuming the corrupted data.

Does this PR introduce any user-facing change?

No

How was this patch tested?

[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM

before this change

[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite: 

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***


Was this patch authored or co-authored using generative AI tooling?

No

@huanliwang-db huanliwang-db changed the title Fix the race condition between state store unload and snapshotting [SPARK-48105] Fix the race condition between state store unload and snapshotting May 3, 2024
@huanliwang-db huanliwang-db marked this pull request as ready for review May 3, 2024 03:16
@huanliwang-db huanliwang-db changed the title [SPARK-48105] Fix the race condition between state store unload and snapshotting [SPARK-48105] Fix the race condition between state store unloading and snapshotting May 3, 2024

override def close(): Unit = {
synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
synchronized { loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a comment here explaining that the underlying value needs to remain alive/cannot be cleared - since there could be other readers with shallow references ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Btw, do we have any reason to remove entities one by one from loadedMaps? This is now the same as loadedMaps.clear(), unless you see some difference w.r.t. behavior on concurrent operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think thats true - we could prob just clear loadedMaps entirely ? just to confirm - it won't somehow cause clear to be called on the values though right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation of treeMap.clear is

public void clear() {
        modCount++;
        size = 0;
        root = null;
    }

where

private transient Entry<K,V> root;

so i think it just resets for the root and won't clear the values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, shouldn't happen, because there is no context caller has intended to do so.

Copy link
Contributor

@HeartSaVioR HeartSaVioR May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i think it just resets for the root and won't clear the values

If we have belief of the implementation of GC then this shouldn't matter at all. The entry instance referenced by root is dereferenced, losing reference count, and will be GC-ed. cascading effect would apply to the whole tree and effectively all of them will be GC-ed.

It might be still a best effort of freeing memory from these entries earlier, likewise we clear the value previously, but in theory they are semantically same. I'm OK with keeping best effort of it though. Not a big deal. Probably one-liner code comment on intention?

}

override def close(): Unit = {
synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
Copy link
Contributor

@HeartSaVioR HeartSaVioR May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow... this was totally unsafe... It's blindly assuming that after calling close() nothing will read from loadedMaps and its entities.

If we were ever doing this we had to also make sure these entries must be removed from loadedMaps as well... (and/or explicitly invalidate this instance so that loadedMaps (and its entities) can never be accessed afterwards.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - the caller was only removing the entry from loadedProviders basically - as part of the unload


override def close(): Unit = {
synchronized { loadedMaps.values.asScala.foreach(_.clear()) }
synchronized { loadedMaps.keySet().asScala.toSeq.foreach(loadedMaps.remove) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flipping the coin, we can also resolve this issue via explicitly copying the map when snapshotting. This way seems to be simpler to understand, but I see the proposal as more optimized so OK for me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea for large state - our memory usage would increase too. So thought that this way might be better - to continue working with the shallow references ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be very huge as we are only copying the "map" and the memory usage of UnsafeRow would be the same, but the proposed way is still an optimized one (removing the overhead of copying) so I agree it's better.

@HeartSaVioR HeartSaVioR changed the title [SPARK-48105] Fix the race condition between state store unloading and snapshotting [SPARK-48105][SS] Fix the race condition between state store unloading and snapshotting May 3, 2024
@anishshri-db
Copy link
Contributor

@HeartSaVioR - do we also need to port to older Spark versions ? 3.5 maybe ?

@anishshri-db
Copy link
Contributor

@huanliwang-db - actually do we need clear in the HDFSStateStoreMap implementation any more - are there any more callers for that function ? I guess we can remove that interface and the corresponding implementation too ?

@huanliwang-db
Copy link
Contributor Author

@huanliwang-db - actually do we need clear in the HDFSStateStoreMap implementation any more - are there any more callers for that function ? I guess we can remove that interface and the corresponding implementation too ?

@anishshri-db there is no other caller, we can remove it. cc: @HeartSaVioR

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented May 3, 2024

@HeartSaVioR - do we also need to port to older Spark versions ? 3.5 maybe ?

Ideally all non-EOL version lines, 3.5/3.4.

@anishshri-db there is no other caller, we can remove it. cc: @HeartSaVioR

Yeah feel free to remove it.

@huanliwang-db
Copy link
Contributor Author

Ideally all non-EOL version lines, 3.5/3.4.

@HeartSaVioR how can i do the backport?

@HeartSaVioR
Copy link
Contributor

Ideally all non-EOL version lines, 3.5/3.4.

@HeartSaVioR how can i do the backport?

I can try cherry pick when I merge the PR. If that has conflicts I'll ask you to submit a backport PR.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master/3.5/3.4.

@HeartSaVioR
Copy link
Contributor

3.5 has a conflict - probably 3.4 would also have a conflict.
@huanliwang-db Could you please help submitting backport PRs for these branches? Thanks in advance!

huanliwang-db added a commit to huanliwang-db/spark that referenced this pull request May 7, 2024
…g and snapshotting

* When we close the hdfs state store, we should only remove the entry from `loadedMaps` rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the providers.

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:
1. the maintenance thread pool encounters some issues and call the [stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774) this function further calls [threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587) However, this function doesn't wait for the stop operation to be completed and move to do the state store [unload and clear.](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778)
2. the provider unload will [close the state store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721) which [clear the values of loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355) for HDFS backed state store.
3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the loadedMaps, after this, the maintenance thread [releases the lock of the loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751).
2. state_store_1 is loaded in another executor, e.g. executor_2.
3. another state store, state_store_2, is loaded on executor_1 and [reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871) to driver.
4. executor_1 does the [unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713) for those no longer active state store which clears the data entries in the `HDFSBackedStateStoreMap`
5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the [iterator doesn't have next element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634) after doing the clear.
6. future batches are consuming the corrupted data.

No

```
[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM
```
before this change

```
[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

```

No

Closes apache#46351 from huanliwang-db/race.

Authored-by: Huanli Wang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR pushed a commit that referenced this pull request May 15, 2024
…oading and snapshotting

* When we close the hdfs state store, we should only remove the entry from `loadedMaps` rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the providers.

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:
1. the maintenance thread pool encounters some issues and call the [stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774) this function further calls [threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587) However, this function doesn't wait for the stop operation to be completed and move to do the state store [unload and clear.](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778)
2. the provider unload will [close the state store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721) which [clear the values of loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355) for HDFS backed state store.
3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the loadedMaps, after this, the maintenance thread [releases the lock of the loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751).
2. state_store_1 is loaded in another executor, e.g. executor_2.
3. another state store, state_store_2, is loaded on executor_1 and [reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871) to driver.
4. executor_1 does the [unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713) for those no longer active state store which clears the data entries in the `HDFSBackedStateStoreMap`
5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the [iterator doesn't have next element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634) after doing the clear.
6. future batches are consuming the corrupted data.

No

```
[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM
```
before this change

```
[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

```

No

Closes #46351 from huanliwang-db/race.

Authored-by: Huanli Wang <huanli.wangdatabricks.com>

Closes #46415 from huanliwang-db/race-3.5.

Authored-by: Huanli Wang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
HeartSaVioR pushed a commit that referenced this pull request May 18, 2024
…oading and snapshotting

* When we close the hdfs state store, we should only remove the entry from `loadedMaps` rather than doing the active data cleanup. JVM GC should be able to help us GC those objects.
* we should wait for the maintenance thread to stop before unloading the providers.

There are two race conditions between state store snapshotting and state store unloading which could result in query failure and potential data corruption.

Case 1:
1. the maintenance thread pool encounters some issues and call the [stopMaintenanceTask,](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774) this function further calls [threadPool.stop.](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587) However, this function doesn't wait for the stop operation to be completed and move to do the state store [unload and clear.](https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778)
2. the provider unload will [close the state store](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721) which [clear the values of loadedMaps](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355) for HDFS backed state store.
3. if the not-yet-stop maintenance thread is still running and trying to do the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been removed. if this snapshot process completes successfully, then we will write corrupted data and the following batches will consume this corrupted data.

Case 2:

1. In executor_1, the maintenance thread is going to do the snapshot for state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the loadedMaps, after this, the maintenance thread [releases the lock of the loadedMaps](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751).
2. state_store_1 is loaded in another executor, e.g. executor_2.
3. another state store, state_store_2, is loaded on executor_1 and [reportActiveStoreInstance](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871) to driver.
4. executor_1 does the [unload](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713) for those no longer active state store which clears the data entries in the `HDFSBackedStateStoreMap`
5. the snapshotting thread is terminated and uploads the incomplete snapshot to cloud because the [iterator doesn't have next element](https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634) after doing the clear.
6. future batches are consuming the corrupted data.

No

```
[info] Run completed in 2 minutes, 55 seconds.
[info] Total number of tests run: 153
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 153, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 271 s (04:31), completed May 2, 2024, 6:26:33 PM
```
before this change

```
[info] - state store unload/close happens during the maintenance *** FAILED *** (648 milliseconds)
[info]   Vector("a1", "a10", "a11", "a12", "a13", "a14", "a15", "a16", "a17", "a18", "a19", "a2", "a20", "a3", "a4", "a5", "a6", "a7", "a8", "a9") did not equal ArrayBuffer("a8") (StateStoreSuite.scala:414)
[info]   Analysis:
[info]   Vector1(0: "a1" -> "a8", 1: "a10" -> , 2: "a11" -> , 3: "a12" -> , 4: "a13" -> , 5: "a14" -> , 6: "a15" -> , 7: "a16" -> , 8: "a17" -> , 9: "a18" -> , 10: "a19" -> , 11: "a2" -> , 12: "a20" -> , 13: "a3" -> , 14: "a4" -> , 15: "a5" -> , 16: "a6" -> , 17: "a7" -> , 18: "a8" -> , 19: "a9" -> )
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$39(StateStoreSuite.scala:414)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuiteBase.tryWithProviderResource(StateStoreSuite.scala:1663)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.$anonfun$new$38(StateStoreSuite.scala:394)
18:32:09.694 WARN org.apache.spark.sql.execution.streaming.state.StateStoreSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.execution.streaming.state.StateStoreSuite, threads: ForkJoinPool.commonPool-worker-1 (daemon=true) =====
[info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
[info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
[info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
[info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
[info]   at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
[info]   at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:213)
[info]   at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:203)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.runTest(StateStoreSuite.scala:90)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.org$scalatest$BeforeAndAfter$$super$run(StateStoreSuite.scala:90)
[info]   at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:273)
[info]   at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:271)
[info]   at org.apache.spark.sql.execution.streaming.state.StateStoreSuite.run(StateStoreSuite.scala:90)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info]   at java.base/java.lang.Thread.run(Thread.java:840)
[info] Run completed in 2 seconds, 4 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***

```

No

Closes #46351 from huanliwang-db/race.

Authored-by: Huanli Wang <huanli.wangdatabricks.com>

Closes #46415 from huanliwang-db/race-3.5.

Authored-by: Huanli Wang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants