-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16861][PYSPARK][CORE] Refactor PySpark accumulator API on top of Accumulator V2 #14467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16861][PYSPARK][CORE] Refactor PySpark accumulator API on top of Accumulator V2 #14467
Conversation
This reverts commit 3c1ea65.
…deprecated-accumulator-api
|
Test build #63140 has finished for PR 14467 at commit
|
| : JList[Array[Byte]] = synchronized { | ||
| override def copy(): PythonAccumulatorV2 = { | ||
| val newAcc = new PythonAccumulatorV2(serverHost, serverPort) | ||
| newAcc._acc = this._acc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a dumb question but does this need to be copied/cloned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the new accumulator API uses the copy method method inside of the copyAndReset which is used inside of writeReplace during serialization. More generally you want to ship a "clean" accumulator to the workers rather than something which is already potentially have some accumulated values inside of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK but this keeps the original accumulator's data -- it even makes both point to the same instance of the data. I suspect I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yah copy() could be used elsewhere besides inside of copyAndReset() - although I don't see any blindingly obvious places where it is used inside of Spark. I'll switch this to be more like the CollectionAccumulator and have a specialized copyAndReset and do a deep copy on copy.
… syncrhonized. Not that I think thats coming anytime soon - but it isn't gauranteed not to happen in the new API.
…oid the deep copy when not needed
|
Test build #63182 has finished for PR 14467 at commit
|
|
Test build #63186 has finished for PR 14467 at commit
|
|
Failure seems unrelated, Jenkins retest this please. |
|
Test build #63190 has finished for PR 14467 at commit
|
|
Test build #63195 has finished for PR 14467 at commit
|
|
Test build #63205 has finished for PR 14467 at commit
|
|
jenkins retest this please. |
|
Test build #63248 has finished for PR 14467 at commit
|
| self._jvm.java.util.ArrayList(), | ||
| self._jvm.PythonAccumulatorParam(host, port)) | ||
| self._javaAccumulator = self._jvm.PythonAccumulatorV2(host, port) | ||
| self._jsc.sc().register(self._javaAccumulator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot fully understand why an accumulator is created for every instance of SparkContext . I see it is used when the attribute _jrdd is called but that still does not clear things :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in general you would have one SparkContext and many RDDs. The accumulator here doesn't represent a specific accumulator rather a general mechanism for all of the Python accumulators are built on top of. The design is certainly a bit confusing if you try and think of it as a regular accumulator - I found it helped to look at how the scala side "merge" is implemented.
| val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize)) | ||
| out.writeInt(val2.size) | ||
| for (array <- val2.asScala) { | ||
| out.writeInt(otherPythonAccumulator._acc.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. There is one more possible thread-safety issue here. Here we access the size of another (synchronized) list, then iterate over it. Both could be a problem if the list is changed somewhere during this process. I think we'd want to explicitly synchronize on otherPythonAccumulator._acc for this whole block, because that's what Collections.synchronizedList is doing in all other cases to guard access to it.
Either that or confirm that this would never be called in a multi-threaded context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this code path is only taken during merging on the driver side - and there is no reason to merge the same accumulated value into two different accumulators at the same time. You can also see the merge logic inside of DAGScheduler.scala & TaskMetrics (although not applicable here since the Python accumulator isn't a task metric) and verify that the updates are merged in one at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I'm wondering about the value of future-proofing this with more consistent synchronization, vs the downside (code complexity, performance). It doesn't make the code more complex, and if this is really only accessed by a single thread, the lock overhead is virtually 0 in this context. Is it better to just synchronize for future correctness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I can do that - just on the other hand we already don't do this inside of CollectionAccumulator (it uses a raw ArrayList without any syncrhonization) - so if this assumption were to change we would also break all collection accumulators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've gone ahead and added the requested synchronization :)
|
Test build #64030 has finished for PR 14467 at commit
|
|
jenkins retest this please |
|
Test build #64087 has finished for PR 14467 at commit
|
|
Just a heads up - going on some vacation next week so will be slower responding to comments. |
|
Back from vacation if anyone has review bandwidth I'd love to get this cleanup in :) |
|
Test build #65119 has finished for PR 14467 at commit
|
|
Test build #65274 has finished for PR 14467 at commit
|
| private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int) | ||
| extends AccumulatorParam[JList[Array[Byte]]] { | ||
| private[spark] class PythonAccumulatorV2(@transient private val serverHost: String, serverPort: Int) | ||
| extends AccumulatorV2[JList[Array[Byte]], JList[Array[Byte]]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, can we use CollectionAccumulator? it's a specialization that's for accumulating a list of things, so might be good to leverage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So not really - the type signatures don't quite match. We do an add of a List of values on the worker threads but CollectionAccumulator expects the add to be of the type being accumulated into the list. We could use merge or add a separate add function, but it would be the only place where merge is called on the workers and I'm hesitant to do that since at that point we really aren't looking much like the accumulators.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @srowen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, CollectionAccumulator would need you to add Array[Byte] to get out List[Array[Byte]]. But actually, the only thing that's added is a singleton list of one Array[Byte] so the usage is already expecting to add one element, not a list.
Then you don't need a custom implementation at all. Just CollectionAccumulator[Array[Byte]]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave this a shot locally -- haven't tested it though. Does this make sense or do you see a reason this won't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would probably work - but at that point we aren't really getting anything from using the CollectionAccumulator base trait are we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we do still need most of the custom logic (since its for copying it from the JVM back to Python during the "merge" step).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not having to reimplement all the other methods. That seems like a win? This thing is fundamentally accumulating a collection of things too. @davies ?
I don't know about the merge logic. I assume that something here is required to send the data back to the Python driver process in order for the accumulator to work, but I don't know this well. At least, that can stay as-is for now. I didn't actually change it much at all in the branch above, it's mostly indentation changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you pointed out it isn't a big change so I'll go ahead and swap it.
|
Test build #65394 has finished for PR 14467 at commit
|
|
Test build #65678 has finished for PR 14467 at commit
|
| /** | ||
| * Value function - not expected to be called for Python. | ||
| */ | ||
| def value: Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry being naive, I'm not getting it why an empty function here?
|
Test build #65728 has finished for PR 14467 at commit
|
|
That LGTM. I'll leave it open a bit for any more comments. |
|
Merged to master |
What changes were proposed in this pull request?
Move the internals of the PySpark accumulator API from the old deprecated API on top of the new accumulator API.
How was this patch tested?
The existing PySpark accumulator tests (both unit tests and doc tests at the start of accumulator.py).