-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-13186][Streaming]Migrate away from SynchronizedMap #11104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@holdenk |
| import org.apache.spark.streaming.{Milliseconds, StreamingContext} | ||
|
|
||
| import java.util.concurrent.ConcurrentHashMap | ||
| import scala.collection.convert.decorateAsScala._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs improved import ordering https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is decorateAsScala needed for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So based on my reading, I think decorateAsScala is being used in place of the standard Java collection conversions to allow the updates to the underlying Java backing type - however this breaks the concurrency guarantees so its doesn't really buy us anything. (but if we were in a situation where concurrency didn't matter and we just wanted to interact with some Java types it could be nice).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of course @huaxingao feel free to correct my understanding if I'm off base :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk Thanks for your comments. Yes, that's why I have decorateAsScala there.
I am working on changing the code to use Java API for +=, put and getOrElseUpdate. Do we also need concurrency guarantee for ++ and --?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't dug into the code enough to say there is any place where it would be safe to note have concurrency guarantees, its probably easier to just use the safe methods: for ++ you can use addAll and there is remove as well.
|
Thanks for the PR and getting started on this :) So the first minor thing we can update easily is import ordering in many of the files should follow the style guide https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports The next is the asScala conversion thats used in many places is removing the concurrency gaurantees:
I think rather than wrapping in underlying.synchronized it might be easier to have the operation on the Java API as we did in #11059 (although its a bit more painful to code this way). We should as coordinate with @ted-yu who I believe is working on corresponding scala style rules to prevent people from using the unsafe scala version. |
| val awsCredentials = KinesisTestUtils.getAWSCredentials() | ||
| val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] | ||
| with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])] | ||
| val collectedData = new ConcurrentHashMap[Time, (Array[SequenceNumberRanges], Seq[Int])].asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This conversion doesn't result in a thread safe hashmap sadly. See the comment in the PR for more details.
|
@huaxingao did a first quick pass :) |
|
@holdenk |
|
Sure I'll take another look. |
| val count = result.getOrElseUpdate(kv._1, 0) + kv._2 | ||
| result.put(kv._1, count) | ||
| result.synchronized { | ||
| val count = result.getOrElseUpdate(kv._1, 0) + kv._2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think putIfAbsent on the underlying Java type might do what you are looking for here. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html#putIfAbsent(K,%20V)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk
Thanks for your quick reply.
I initially changed
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
to
result.putIfAbsent(kv._1, 0)
val count = result.get(kv._1) + kv.2
but the test failed for me. I guess a different thread can come in between of the two lines and the concurrency is not guaranteed any more. So I used synchronized block instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you would probably want to try val count = result.putIfAbsent(kv.1, 0) + kv._2 - although looking at the original code it had a race condition. If were going to put a synchronized block around the update we could just use a regular mutable.HashMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for using synchronized + mutable.HashMap. In addition, toMap in ret.toMap.foreach can be removed. Hence I would recommend changing codes to
val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
r.collect().foreach { kv =>
result.synchronized {
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And also change assert to assert(result.synchronized { sent === result })
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk @zsxwing
I tried _val count = result.putIfAbsent(kv.1, 0) + kv.2, but the test failed for me. So I will change to mutable.HashMap and put in synchronized block.
Is it OK to use mutable.HashMap and synchronized block in this file only, but use java.util.concurrent.ConcurrentHashMap in other files(StreamingListenerSuite, KinesisStreamTests and FileInputDStream)? Or is it better to to use mutable.HashMap and synchronized block for all the files that has SynchronizedMap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using ConcurrentHashMap in other files looks fine. I don't see any potential issues.
|
So I think the |
|
By the way, is there any PR removing |
|
@zsxwing |
| import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap} | ||
| import java.util.concurrent.ConcurrentHashMap | ||
|
|
||
| import scala.collection.convert.decorateAsScala._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use import scala.collection.JavaConverters._
|
LGTM except some nits. Thanks, @huaxingao |
|
Fixed the problems. Thank you all very much for your help!! |
|
ok to test |
|
Test build #50949 has finished for PR 11104 at commit
|
|
Sorry for the file line length problem. Fixed. |
|
Test build #50950 has finished for PR 11104 at commit
|
|
@huaxingao one more style issue and it also needs to be updated with the latest master since there are now merge conflicts. |
|
Test build #51018 has finished for PR 11104 at commit
|
|
@huaxingao can you rebase this? |
|
@srowen |
|
@huaxingao OK, do you want to close this and try another PR? I can try to take it on too. |
trait SynchronizedMap in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Change to java.util.concurrent.ConcurrentHashMap instead.