- 
                Notifications
    You must be signed in to change notification settings 
- Fork 28.9k
[SPARK-28967][CORE] Include cloned version of "properties" to avoid ConcurrentModificationException #25672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…oncurrentModificationException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, pending tests
| Test build #110090 has finished for PR 25672 at commit  
 | 
| retest this, please | 
| Test build #110109 has finished for PR 25672 at commit  
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable pending tests. I wonder why it happens for 0 partitions... something gets short-circuited and stuff happens locally and then suddenly you're modifying the same object in the JVM?
| 
 | 
| retest this, please | 
| Test build #110142 has finished for PR 25672 at commit  
 | 
| @felixcheung @srowen | 
| Merged to master. I am not sure if it affects 2.4; the code is different around the change. If it does you can evaluate in another PR. | 
| Thanks all for reviewing and merging! It's caused by SPARK-26714 which is applied to 3.0, so backport is not needed. | 
…oncurrentModificationException ### What changes were proposed in this pull request? This patch fixes the bug which throws ConcurrentModificationException when job with 0 partition is submitted via DAGScheduler. ### Why are the changes needed? Without this patch, structured streaming query throws ConcurrentModificationException, like below stack trace: ``` 19/09/04 09:48:49 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception java.util.ConcurrentModificationException at java.util.Hashtable$Enumerator.next(Hashtable.java:1387) at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424) at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.util.JsonProtocol$.mapToJson(JsonProtocol.scala:514) at org.apache.spark.util.JsonProtocol$.$anonfun$propertiesToJson$1(JsonProtocol.scala:520) at scala.Option.map(Option.scala:163) at org.apache.spark.util.JsonProtocol$.propertiesToJson(JsonProtocol.scala:519) at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:155) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:79) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:149) at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:217) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37) at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:99) at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:84) at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:102) at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:102) at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:97) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:93) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319) at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:93) ``` Please refer https://issues.apache.org/jira/browse/SPARK-28967 for detailed reproducer. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Newly added UT. Also manually tested via running simple structured streaming query in spark-shell. Closes apache#25672 from HeartSaVioR/SPARK-28967. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Sean Owen <[email protected]>
What changes were proposed in this pull request?
This patch fixes the bug which throws ConcurrentModificationException when job with 0 partition is submitted via DAGScheduler.
Why are the changes needed?
Without this patch, structured streaming query throws ConcurrentModificationException, like below stack trace:
Please refer https://issues.apache.org/jira/browse/SPARK-28967 for detailed reproducer.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Newly added UT. Also manually tested via running simple structured streaming query in spark-shell.