-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-1615] Synchronize accesses to the LiveListenerBus' event queue #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This guards against the race condition in which we (1) dequeue an event, and (2) check for queue emptiness before (3) actually processing the event in all attached listeners. The solution is to make steps (1) and (3) atomic relatively to (2).
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14462/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can only send release if eventAdded. It's a minor detail but would also mean that event should never be null, which should (slightly) help the bus to catch up with the workload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, didn't realize offer also returns a boolean
|
LGTM, the semantics and performance of this seem quite reasonable. The synchronzied block should be virtually cost-less in normal operation, and I trust java.concurrent's implementation of Semaphore. |
|
Looks great. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
|
Thanks - merged. |
Original poster is @zsxwing, who reported this bug in #516. Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen: (1) We dequeue an event (2) The queue is empty, we return true (even though the event has not been processed) (3) The test asserts something assuming that all listeners have finished executing (and fails) (4) The listeners receive and process the event This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using `eventQueue.take`, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blocking `eventQueue.poll` + a semaphore to provide the same semantics. This has been a possible race condition for a long time, but for some reason we've never run into it. Author: Andrew Or <[email protected]> Closes #544 from andrewor14/stage-info-test-fix and squashes the following commits: 3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix 56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue (cherry picked from commit ee6f7e2) Signed-off-by: Patrick Wendell <[email protected]>
…loses apache#544. Fixed warnings in test compilation. This commit fixes two problems: a redundant import, and a deprecated function. Author: Kay Ousterhout <[email protected]> == Merge branch commits == commit da9d2e13ee4102bc58888df0559c65cb26232a82 Author: Kay Ousterhout <[email protected]> Date: Wed Feb 5 11:41:51 2014 -0800 Fixed warnings in test compilation. This commit fixes two problems: a redundant import, and a deprecated function.
Original poster is @zsxwing, who reported this bug in apache#516. Much of SparkListenerSuite relies on LiveListenerBus's `waitUntilEmpty()` method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen: (1) We dequeue an event (2) The queue is empty, we return true (even though the event has not been processed) (3) The test asserts something assuming that all listeners have finished executing (and fails) (4) The listeners receive and process the event This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using `eventQueue.take`, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blocking `eventQueue.poll` + a semaphore to provide the same semantics. This has been a possible race condition for a long time, but for some reason we've never run into it. Author: Andrew Or <[email protected]> Closes apache#544 from andrewor14/stage-info-test-fix and squashes the following commits: 3cbe40c [Andrew Or] Merge github.com:apache/spark into stage-info-test-fix 56dbbcb [Andrew Or] Check if event is actually added before releasing semaphore eb486ae [Andrew Or] Synchronize accesses to the LiveListenerBus' event queue
Fix the stubbing of the reader benchmark tests
* Change log dir Change log dir * Update run.yaml * Update run.yaml * Update run.yaml
…API doesn't work as expected (apache#544)
…te` for Java 21 ### What changes were proposed in this pull request? SPARK-44507(#42130) updated `try_arithmetic.sql.out` and `numeric.sql.out`, SPARK-44868(#42534) updated `datetime-formatting.sql.out`, but these PRs didn’t pay attention to the test health on Java 21. So this PR has regenerated the golden files `try_arithmetic.sql.out.java21`, `numeric.sql.out.java21`, and `datetime-formatting.sql.out.java21` of `SQLQueryTestSuite` so that `SQLQueryTestSuite` can be tested with Java 21. ### Why are the changes needed? Restore `SQLQueryTestSuite` to be tested with Java 21. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual checked: ``` java -version openjdk version "21-ea" 2023-09-19 OpenJDK Runtime Environment Zulu21+69-CA (build 21-ea+28) OpenJDK 64-Bit Server VM Zulu21+69-CA (build 21-ea+28, mixed mode, sharing) ``` ``` SPARK_GENERATE_GOLDEN_FILES=0 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` **Before** ``` ... [info] - datetime-formatting.sql *** FAILED *** (316 milliseconds) [info] datetime-formatting.sql [info] Array("-- Automatically generated by SQLQueryTestSuite [info] ", "create temporary view v as select col from values [info] (timestamp '1582-06-01 11:33:33.123UTC+080000'), [info] (timestamp '1970-01-01 00:00:00.000Europe/Paris'), [info] (timestamp '1970-12-31 23:59:59.999Asia/Srednekolymsk'), [info] (timestamp '1996-04-01 00:33:33.123Australia/Darwin'), [info] (timestamp '2018-11-17 13:33:33.123Z'), [info] (timestamp '2020-01-01 01:33:33.123Asia/Shanghai'), [info] (timestamp '2100-01-01 01:33:33.123America/Los_Angeles') t(col) [info] ", "struct<> [info] ", " [info] [info] [info] ", "select col, date_format(col, 'G GG GGG GGGG') from v [info] ", "struct<col:timestamp,date_format(col, G GG GGG GGGG):string> [info] ", "1582-05-31 19:40:35.123 AD AD AD Anno Domini [info] 1969-12-31 15:00:00 AD AD AD Anno Domini [info] 1970-12-31 04:59:59.999 AD AD AD Anno Domini [info] 1996-03-31 07:03:33.123 AD AD AD Anno Domini [info] 2018-11-17 05:33:33.123 AD AD AD Anno Domini [info] 2019-12-31 09:33:33.123 AD AD AD Anno Domini [info] 2100-01-01 01:33:33.123 AD AD AD Anno Domini [info] [info] [info] ", "select col, date_format(col, 'y yy yyy yyyy yyyyy yyyyyy') from v [info] ", "struct<col:timestamp,date_format(col, y yy yyy yyyy yyyyy yyyyyy):string> [info] ", "1582-05-31 19:40:35.123 1582 82 1582 1582 01582 001582 [info] 1969-12-31 15:00:00 1969 69 1969 1969 01969 001969 [info] 1970-12-31 04:59:59.999 1970 70 1970 1970 01970 001970 [info] 1996-03-31 07:03:33.123 1996 96 1996 1996 01996 001996 [info] 2018-11-17 05:33:33.123 2018 18 2018 2018 02018 002018 [info] 2019-12-31 09:33:33.123 2019 19 2019 2019 02019 002019 [info] 2100-01-01 01:33:33.123 2100 00 2100 2100 02100 002100 [info] ... [info] - postgreSQL/numeric.sql *** FAILED *** (35 seconds, 848 milliseconds) [info] postgreSQL/numeric.sql [info] Expected "...rg.apache.spark.sql.[]AnalysisException [info] { [info] ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException [info] { [info] ..." Result did not match for query #544 [info] SELECT '' AS to_number_2, to_number('-34,338,492.654,878', '99G999G999D999G999') (SQLQueryTestSuite.scala:876) [info] org.scalatest.exceptions.TestFailedException: ... [info] - try_arithmetic.sql *** FAILED *** (314 milliseconds) [info] try_arithmetic.sql [info] Expected "...rg.apache.spark.sql.[]AnalysisException [info] { [info] ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException [info] { [info] ..." Result did not match for query #20 [info] SELECT try_add(interval 2 year, interval 2 second) (SQLQueryTestSuite.scala:876) [info] org.scalatest.exceptions.TestFailedException: ``` **After** ``` [info] Run completed in 9 minutes, 10 seconds. [info] Total number of tests run: 572 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 572, failed 0, canceled 0, ignored 59, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #42580 from LuciferYang/SPARK-44888. Authored-by: yangjie01 <[email protected]> Signed-off-by: yangjie01 <[email protected]>
…te` for Java 21 ### What changes were proposed in this pull request? SPARK-44507(apache#42130) updated `try_arithmetic.sql.out` and `numeric.sql.out`, SPARK-44868(apache#42534) updated `datetime-formatting.sql.out`, but these PRs didn’t pay attention to the test health on Java 21. So this PR has regenerated the golden files `try_arithmetic.sql.out.java21`, `numeric.sql.out.java21`, and `datetime-formatting.sql.out.java21` of `SQLQueryTestSuite` so that `SQLQueryTestSuite` can be tested with Java 21. ### Why are the changes needed? Restore `SQLQueryTestSuite` to be tested with Java 21. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual checked: ``` java -version openjdk version "21-ea" 2023-09-19 OpenJDK Runtime Environment Zulu21+69-CA (build 21-ea+28) OpenJDK 64-Bit Server VM Zulu21+69-CA (build 21-ea+28, mixed mode, sharing) ``` ``` SPARK_GENERATE_GOLDEN_FILES=0 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` **Before** ``` ... [info] - datetime-formatting.sql *** FAILED *** (316 milliseconds) [info] datetime-formatting.sql [info] Array("-- Automatically generated by SQLQueryTestSuite [info] ", "create temporary view v as select col from values [info] (timestamp '1582-06-01 11:33:33.123UTC+080000'), [info] (timestamp '1970-01-01 00:00:00.000Europe/Paris'), [info] (timestamp '1970-12-31 23:59:59.999Asia/Srednekolymsk'), [info] (timestamp '1996-04-01 00:33:33.123Australia/Darwin'), [info] (timestamp '2018-11-17 13:33:33.123Z'), [info] (timestamp '2020-01-01 01:33:33.123Asia/Shanghai'), [info] (timestamp '2100-01-01 01:33:33.123America/Los_Angeles') t(col) [info] ", "struct<> [info] ", " [info] [info] [info] ", "select col, date_format(col, 'G GG GGG GGGG') from v [info] ", "struct<col:timestamp,date_format(col, G GG GGG GGGG):string> [info] ", "1582-05-31 19:40:35.123 AD AD AD Anno Domini [info] 1969-12-31 15:00:00 AD AD AD Anno Domini [info] 1970-12-31 04:59:59.999 AD AD AD Anno Domini [info] 1996-03-31 07:03:33.123 AD AD AD Anno Domini [info] 2018-11-17 05:33:33.123 AD AD AD Anno Domini [info] 2019-12-31 09:33:33.123 AD AD AD Anno Domini [info] 2100-01-01 01:33:33.123 AD AD AD Anno Domini [info] [info] [info] ", "select col, date_format(col, 'y yy yyy yyyy yyyyy yyyyyy') from v [info] ", "struct<col:timestamp,date_format(col, y yy yyy yyyy yyyyy yyyyyy):string> [info] ", "1582-05-31 19:40:35.123 1582 82 1582 1582 01582 001582 [info] 1969-12-31 15:00:00 1969 69 1969 1969 01969 001969 [info] 1970-12-31 04:59:59.999 1970 70 1970 1970 01970 001970 [info] 1996-03-31 07:03:33.123 1996 96 1996 1996 01996 001996 [info] 2018-11-17 05:33:33.123 2018 18 2018 2018 02018 002018 [info] 2019-12-31 09:33:33.123 2019 19 2019 2019 02019 002019 [info] 2100-01-01 01:33:33.123 2100 00 2100 2100 02100 002100 [info] ... [info] - postgreSQL/numeric.sql *** FAILED *** (35 seconds, 848 milliseconds) [info] postgreSQL/numeric.sql [info] Expected "...rg.apache.spark.sql.[]AnalysisException [info] { [info] ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException [info] { [info] ..." Result did not match for query apache#544 [info] SELECT '' AS to_number_2, to_number('-34,338,492.654,878', '99G999G999D999G999') (SQLQueryTestSuite.scala:876) [info] org.scalatest.exceptions.TestFailedException: ... [info] - try_arithmetic.sql *** FAILED *** (314 milliseconds) [info] try_arithmetic.sql [info] Expected "...rg.apache.spark.sql.[]AnalysisException [info] { [info] ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException [info] { [info] ..." Result did not match for query apache#20 [info] SELECT try_add(interval 2 year, interval 2 second) (SQLQueryTestSuite.scala:876) [info] org.scalatest.exceptions.TestFailedException: ``` **After** ``` [info] Run completed in 9 minutes, 10 seconds. [info] Total number of tests run: 572 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 572, failed 0, canceled 0, ignored 59, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#42580 from LuciferYang/SPARK-44888. Authored-by: yangjie01 <[email protected]> Signed-off-by: yangjie01 <[email protected]>
…te` for Java 21 ### What changes were proposed in this pull request? SPARK-44507(apache#42130) updated `try_arithmetic.sql.out` and `numeric.sql.out`, SPARK-44868(apache#42534) updated `datetime-formatting.sql.out`, but these PRs didn’t pay attention to the test health on Java 21. So this PR has regenerated the golden files `try_arithmetic.sql.out.java21`, `numeric.sql.out.java21`, and `datetime-formatting.sql.out.java21` of `SQLQueryTestSuite` so that `SQLQueryTestSuite` can be tested with Java 21. ### Why are the changes needed? Restore `SQLQueryTestSuite` to be tested with Java 21. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual checked: ``` java -version openjdk version "21-ea" 2023-09-19 OpenJDK Runtime Environment Zulu21+69-CA (build 21-ea+28) OpenJDK 64-Bit Server VM Zulu21+69-CA (build 21-ea+28, mixed mode, sharing) ``` ``` SPARK_GENERATE_GOLDEN_FILES=0 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` **Before** ``` ... [info] - datetime-formatting.sql *** FAILED *** (316 milliseconds) [info] datetime-formatting.sql [info] Array("-- Automatically generated by SQLQueryTestSuite [info] ", "create temporary view v as select col from values [info] (timestamp '1582-06-01 11:33:33.123UTC+080000'), [info] (timestamp '1970-01-01 00:00:00.000Europe/Paris'), [info] (timestamp '1970-12-31 23:59:59.999Asia/Srednekolymsk'), [info] (timestamp '1996-04-01 00:33:33.123Australia/Darwin'), [info] (timestamp '2018-11-17 13:33:33.123Z'), [info] (timestamp '2020-01-01 01:33:33.123Asia/Shanghai'), [info] (timestamp '2100-01-01 01:33:33.123America/Los_Angeles') t(col) [info] ", "struct<> [info] ", " [info] [info] [info] ", "select col, date_format(col, 'G GG GGG GGGG') from v [info] ", "struct<col:timestamp,date_format(col, G GG GGG GGGG):string> [info] ", "1582-05-31 19:40:35.123 AD AD AD Anno Domini [info] 1969-12-31 15:00:00 AD AD AD Anno Domini [info] 1970-12-31 04:59:59.999 AD AD AD Anno Domini [info] 1996-03-31 07:03:33.123 AD AD AD Anno Domini [info] 2018-11-17 05:33:33.123 AD AD AD Anno Domini [info] 2019-12-31 09:33:33.123 AD AD AD Anno Domini [info] 2100-01-01 01:33:33.123 AD AD AD Anno Domini [info] [info] [info] ", "select col, date_format(col, 'y yy yyy yyyy yyyyy yyyyyy') from v [info] ", "struct<col:timestamp,date_format(col, y yy yyy yyyy yyyyy yyyyyy):string> [info] ", "1582-05-31 19:40:35.123 1582 82 1582 1582 01582 001582 [info] 1969-12-31 15:00:00 1969 69 1969 1969 01969 001969 [info] 1970-12-31 04:59:59.999 1970 70 1970 1970 01970 001970 [info] 1996-03-31 07:03:33.123 1996 96 1996 1996 01996 001996 [info] 2018-11-17 05:33:33.123 2018 18 2018 2018 02018 002018 [info] 2019-12-31 09:33:33.123 2019 19 2019 2019 02019 002019 [info] 2100-01-01 01:33:33.123 2100 00 2100 2100 02100 002100 [info] ... [info] - postgreSQL/numeric.sql *** FAILED *** (35 seconds, 848 milliseconds) [info] postgreSQL/numeric.sql [info] Expected "...rg.apache.spark.sql.[]AnalysisException [info] { [info] ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException [info] { [info] ..." Result did not match for query apache#544 [info] SELECT '' AS to_number_2, to_number('-34,338,492.654,878', '99G999G999D999G999') (SQLQueryTestSuite.scala:876) [info] org.scalatest.exceptions.TestFailedException: ... [info] - try_arithmetic.sql *** FAILED *** (314 milliseconds) [info] try_arithmetic.sql [info] Expected "...rg.apache.spark.sql.[]AnalysisException [info] { [info] ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException [info] { [info] ..." Result did not match for query apache#20 [info] SELECT try_add(interval 2 year, interval 2 second) (SQLQueryTestSuite.scala:876) [info] org.scalatest.exceptions.TestFailedException: ``` **After** ``` [info] Run completed in 9 minutes, 10 seconds. [info] Total number of tests run: 572 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 572, failed 0, canceled 0, ignored 59, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#42580 from LuciferYang/SPARK-44888. Authored-by: yangjie01 <[email protected]> Signed-off-by: yangjie01 <[email protected]>
Original poster is @zsxwing, who reported this bug in #516.
Much of SparkListenerSuite relies on LiveListenerBus's
waitUntilEmpty()method. As the name suggests, this waits until the event queue is empty. However, the following race condition could happen:(1) We dequeue an event
(2) The queue is empty, we return true (even though the event has not been processed)
(3) The test asserts something assuming that all listeners have finished executing (and fails)
(4) The listeners receive and process the event
This PR makes (1) and (4) atomic by synchronizing around it. To do that, however, we must avoid using
eventQueue.take, which is blocking and will cause a deadlock if we synchronize around it. As a workaround, we use the non-blockingeventQueue.poll+ a semaphore to provide the same semantics.This has been a possible race condition for a long time, but for some reason we've never run into it.