-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-30414][SQL] ParquetRowConverter optimizations: arrays, maps, plus misc. constant factors #27089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30414][SQL] ParquetRowConverter optimizations: arrays, maps, plus misc. constant factors #27089
Conversation
|
Test build #116087 has finished for PR 27089 at commit
|
|
@cloud-fan @HyukjinKwon @dongjoon-hyun @viirya, could you take a look at this PR which implements several small performance optimizations in |
|
|
||
| private trait RepeatedConverter { | ||
| private var currentArray: ArrayBuffer[Any] = _ | ||
| private[this] val currentArray = new java.util.ArrayList[Any]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen, sorry if I'm ignorant about this but why do we need to change ArrayBuffer -> ArrayList? Seems ArrayBuffer itself is mutable and can clear() too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From prior experience I've found ArrayList to be marginally faster; I ran some quick-and-dirty non-Spark microbenchmarks and this is indeed still the case, but the gain is pretty marginal compared to other factors.
In the interests of code simplicity and clarity, I've backed out that part of the change: the code now uses and clear()s a mutable.ArrayBuffer: 6d16f59
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but one question
| // NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the | ||
| // next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored | ||
| // in row cells. | ||
| override def start(): Unit = currentArray = ArrayBuffer.empty[Any] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends on currentArray.toArray copies the elements or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArrayBuffer.toArray should always return a fresh unshared array object (internally, it allocates a new array and then calls copyToArray).
It doesn't do copying / cloning of the array elements themselves, but that shouldn't be a problem: by design, the objects that are inserted into this array are unshared / immutable: the map and array converters always return unshared objects and we always .copy() rows when inserting them into a map or array parent container (this is still true after the changes in #26993).
I did a bit of archaeology and tracked down the source of the // NOTE comment here: it was added in #7231 and at that time it looks like we were actually passing the mutable.ArrayBuffer itself to updater: https://github.com/apache/spark/blame/360fe18a61538b03cac05da1c6d258e124df6feb/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystRowConverter.scala#L321. The comment makes sense in that context: with that older code, we would wind up with Row() objects that contained mutable.ArrayBuffers.
Later, in #7724 this was changed to pass a new GenericArrayData(currentArray.toArray) to the parent updater: c0cc0ea#diff-1d6c363c04155a9328fe1f5bd08a2f90. At that point I think we could have safely made the change to begin reusing the mutable.ArrayBuffer since it no longer escaped its converter.
|
Test build #116199 has finished for PR 27089 at commit
|
|
Merged to master. |
What changes were proposed in this pull request?
This PR implements multiple performance optimizations for
ParquetRowConverter, achieving some modest constant-factor wins for all fields and larger wins for map and array fields:private[this]to severalvals (90cebf0)fieldUpdatersarray, saving two.updater()calls per field (7318785): I suspect that these are often megamorphic calls, so cutting these out seems like it could be a relatively large performance win.currentRow.numFieldsonce perstart()call (e05de15): previously we'd call it once per field and this had a significant enough cost that it was visible during profiling.ArrayBufferfor each field read, but this isn't actually necessary because the data is already copied into a fresh array whenend()constructs aGenericArrayData.Why are the changes needed?
To improve Parquet read performance; this is complementary to #26993's (orthogonal) improvements for nested struct read performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests, plus manual benchmarking with both synthetic and realistic schemas (similar to the ones in #26993). I've seen ~10%+ improvements in scan performance on certain real-world datasets.