-
Notifications
You must be signed in to change notification settings - Fork 28.9k
Do not re-use objects in the EdgePartition/EdgeTriplet iterators. #276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…s avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code.
|
Can one of the admins verify this patch? |
|
Sorry, the new JIRA link is https://issues.apache.org/jira/browse/SPARK-1188. Thanks! |
|
Thanks for sending this in. It's something that is extremely confusing in graphx and we need to fix it. However, I am not sure if taking out the object reuse in edges is the way to fix this problem. This is actually hard to test in micro benchmarks, because you rarely get GCs in micro benchmarks. In these cases where an edge/triplet is returned by an iterator, JVM's escape analysis cannot capture the scope and cannot do on-stack allocation. As a result, there are lots of temporary objects allocated in the heap. Allocations of short-lived objects are supposed to be cheap. The most expensive objects to gc are medium-lived objects. However, the more temporary objects we allocate, the more frequent a young gen gc happens. The more frequent young gen gc happens, the more likely for random objects to become medium lived. Maybe a better way to fix this is to leave the object reuse as is, but in all places where we return the object to the user, we should make sure it copies it. I just looked at the code, and I think we can accomplish that by just adding a copy to EdgeRDD.compute, i.e. override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context).next._2.iterator.map(_.copy())
} |
|
And for EdgeTripletIterator we should just keep your change (i.e. always create a new row) since that is not used internally at all. |
…xposing the object re-use, while still enables the more efficient behavior for internal code.
|
Thanks for the comments! The description of the GC effects was very educational. I made the suggested changes. Let me know if you'd like to see something else changed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we revert this change? This is where Scala fails us. The while loop is ~ 2x faster than iter.toArray in my micro benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Yeah, I didn't benchmark this, sorry.
I guess toArray must work with a dynamic array, since it does not know the size in advance. There is an Iterator.copyToArray, but regrettably it does not return the number of elements copied, so we would lose the important assertion. I filed a feature request for this. (https://issues.scala-lang.org/browse/SI-8469) Hopefully I'm not being obtuse.
|
Thanks @darabos. The change looks good to me other than the one place I pointed out. |
…to discourage novices like myself from trying to simplify the code.
|
Merged. Thanks! This is a very important usability improvement. |
…erators. This avoids a silent data corruption issue (https://spark-project.atlassian.net/browse/SPARK-1188) and has no performance impact by my measurements. It also simplifies the code. As far as I can tell the object re-use was nothing but premature optimization. I did actual benchmarks for all the included changes, and there is no performance difference. I am not sure where to put the benchmarks. Does Spark not have a benchmark suite? This is an example benchmark I did: test("benchmark") { val builder = new EdgePartitionBuilder[Int] for (i <- (1 to 10000000)) { builder.add(i.toLong, i.toLong, i) } val p = builder.toEdgePartition p.map(_.attr + 1).iterator.toList } It ran for 10 seconds both before and after this change. Author: Daniel Darabos <[email protected]> Closes #276 from darabos/spark-1188 and squashes the following commits: 574302b [Daniel Darabos] Restore "manual" copying in EdgePartition.map(Iterator). Add comment to discourage novices like myself from trying to simplify the code. 4117a64 [Daniel Darabos] Revert EdgePartitionSuite. 4955697 [Daniel Darabos] Create a copy of the Edge objects in EdgeRDD.compute(). This avoids exposing the object re-use, while still enables the more efficient behavior for internal code. 4ec77f8 [Daniel Darabos] Add comments about object re-use to the affected functions. 2da5e87 [Daniel Darabos] Restore object re-use in EdgePartition. 0182f2b [Daniel Darabos] Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code. c55f52f [Daniel Darabos] Tests that reproduce the problems from SPARK-1188. (cherry picked from commit 7823633) Signed-off-by: Reynold Xin <[email protected]>
…erators. This avoids a silent data corruption issue (https://spark-project.atlassian.net/browse/SPARK-1188) and has no performance impact by my measurements. It also simplifies the code. As far as I can tell the object re-use was nothing but premature optimization. I did actual benchmarks for all the included changes, and there is no performance difference. I am not sure where to put the benchmarks. Does Spark not have a benchmark suite? This is an example benchmark I did: test("benchmark") { val builder = new EdgePartitionBuilder[Int] for (i <- (1 to 10000000)) { builder.add(i.toLong, i.toLong, i) } val p = builder.toEdgePartition p.map(_.attr + 1).iterator.toList } It ran for 10 seconds both before and after this change. Author: Daniel Darabos <[email protected]> Closes apache#276 from darabos/spark-1188 and squashes the following commits: 574302b [Daniel Darabos] Restore "manual" copying in EdgePartition.map(Iterator). Add comment to discourage novices like myself from trying to simplify the code. 4117a64 [Daniel Darabos] Revert EdgePartitionSuite. 4955697 [Daniel Darabos] Create a copy of the Edge objects in EdgeRDD.compute(). This avoids exposing the object re-use, while still enables the more efficient behavior for internal code. 4ec77f8 [Daniel Darabos] Add comments about object re-use to the affected functions. 2da5e87 [Daniel Darabos] Restore object re-use in EdgePartition. 0182f2b [Daniel Darabos] Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code. c55f52f [Daniel Darabos] Tests that reproduce the problems from SPARK-1188. (cherry picked from commit 7823633) Signed-off-by: Reynold Xin <[email protected]>
This avoids a silent data corruption issue (https://spark-project.atlassian.net/browse/SPARK-1188) and has no performance impact by my measurements. It also simplifies the code. As far as I can tell the object re-use was nothing but premature optimization. I did actual benchmarks for all the included changes, and there is no performance difference. I am not sure where to put the benchmarks. Does Spark not have a benchmark suite? This is an example benchmark I did: test("benchmark") { val builder = new EdgePartitionBuilder[Int] for (i <- (1 to 10000000)) { builder.add(i.toLong, i.toLong, i) } val p = builder.toEdgePartition p.map(_.attr + 1).iterator.toList } It ran for 10 seconds both before and after this change. Author: Daniel Darabos <[email protected]> Closes apache#276 from darabos/spark-1188 and squashes the following commits: 574302b [Daniel Darabos] Restore "manual" copying in EdgePartition.map(Iterator). Add comment to discourage novices like myself from trying to simplify the code. 4117a64 [Daniel Darabos] Revert EdgePartitionSuite. 4955697 [Daniel Darabos] Create a copy of the Edge objects in EdgeRDD.compute(). This avoids exposing the object re-use, while still enables the more efficient behavior for internal code. 4ec77f8 [Daniel Darabos] Add comments about object re-use to the affected functions. 2da5e87 [Daniel Darabos] Restore object re-use in EdgePartition. 0182f2b [Daniel Darabos] Do not re-use objects in the EdgePartition/EdgeTriplet iterators. This avoids a silent data corruption issue (SPARK-1188) and has no performance impact in my measurements. It also simplifies the code. c55f52f [Daniel Darabos] Tests that reproduce the problems from SPARK-1188.
* Allow replacing conda channels, separate channel userInfos as auxiliary data for CondaSetupInstructions (so executors can reuse environment when only creds change)
…ipal to "alice" (apache#276) * [TESTS][SPARK-659] In HDFS tests, updated the HDFS auth_to_local setting and changed the principal to "alice". * Updated the docs * Incorporated suggestions from Evan and Stavros: clarified the Spark User section of the docs; added a run_terasort_job() helper function, and more.
Only run periodic jobs on master branch for packer
…ation problem (apache#276) * CARMEL-4773: Fix role name with upper case (apache#423) * [CARMEL-4768][CARMEL-4775] Fix NPE issue when `show grant` (apache#426) * CARMEL-4768 and CARMEL-4775: Fix NPE issue * fix * [CARMEL-4874] Role grant with admin option failing on spark 3.0 (apache#473) * [CARMEL-4874] Role grant with admin option failing on spark 3.0 * Update authorization.scala --------- Co-authored-by: yumwang <[email protected]>
This avoids a silent data corruption issue (https://spark-project.atlassian.net/browse/SPARK-1188) and has no performance impact by my measurements. It also simplifies the code. As far as I can tell the object re-use was nothing but premature optimization.
I did actual benchmarks for all the included changes, and there is no performance difference. I am not sure where to put the benchmarks. Does Spark not have a benchmark suite?
This is an example benchmark I did:
test("benchmark") {
val builder = new EdgePartitionBuilder[Int]
for (i <- (1 to 10000000)) {
builder.add(i.toLong, i.toLong, i)
}
val p = builder.toEdgePartition
p.map(_.attr + 1).iterator.toList
}
It ran for 10 seconds both before and after this change.