-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18857][SQL] Don't use Iterator.duplicate for incrementalCollect in Thrift Server
#16440
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…lect` in Thrift Server. Since Scala `Iterator.duplicate` uses a queue to buffer all items between both iterators, this causes GC and hangs. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300
|
Test build #70751 has finished for PR 16440 at commit
|
| } | ||
| } | ||
|
|
||
| private def useIncrementalCollect: Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be a def? will it ever change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. When we uses beeline, we can control this like the following.
0: jdbc:hive2://localhost:10000> set spark.sql.thriftServer.incrementalCollect=false;
+--------------------------------------------+--------+--+
| key | value |
+--------------------------------------------+--------+--+
| spark.sql.thriftServer.incrementalCollect | false |
+--------------------------------------------+--------+--+
1 row selected (0.015 seconds)
0: jdbc:hive2://localhost:10000> select * from t;
+----+--+
| a |
+----+--+
+----+--+
No rows selected (0.054 seconds)| with Logging { | ||
|
|
||
| private var result: DataFrame = _ | ||
| private var resultList: Option[Array[org.apache.spark.sql.Row]] = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Write SparkRow for consistency? and init to None explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for review, @srowen .
Sure!
| resultList = None | ||
| result.toLocalIterator.asScala | ||
| } else { | ||
| if (resultList.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this makes the implicit buffering implicit. So, if an iterator is duplicated into A and B, and all of A is consumed, then B will internally buffer everything from A so it can be replayed? and in our case, we know that A will be entirely consumed? then these are basically the same, yes.
But, does that solve the problem? this now always stores the whole result set locally. Is this avoiding a second whole copy of it?
What if you always just return result.collect().iterator here -- the problem is the re-collecting the result every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The following happens with iterator.duplicate.
So, if an iterator is duplicated into A and B, and all of A is consumed, then B will internally buffer everything from A so it can be replayed?
And, the whole result storing happens line 122 and line 245-246 for spark.sql.thriftServer.incrementalCollect=false only.
resultList = Some(result.collect())
resultList.get.iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose I'm asking, why is this an improvement? because in the new version, you also buffer the whole result into memory locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. There are two cases and this PR targets incrementalCollect=true mainly.
you also buffer the whole result into memory locally.
First of all, before SPARK-16563, FETCH_FIRST is not supported correctly because iterator can be traversed once.
-
Case 1)
incrementalCollect=false
Creating a whole result in a memory once by result.collect was the the original Spark way beforeSPARK-16563.
If we can create a whole result once during the query processing, I think we can keep that forFETCH_FIRSTwith less side effect.
So, I keep them. If this is not allowed, we have to goCase 2. -
Case 2)
incrementalCollect=true
In this case, by definition, we cannot create the whole result set in a memory at any time during the query processing. There is no way to find the first row with iterator.result.toLocalIterator.asScalashould be called wheneverFETCH_FIRSTis used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I believe I get it now. I see your approach and it makes sense.
The only real change here is that you hold on to a reference to the whole data set here rather than collect() it into memory. Maybe that's the right thing to do but that's the only thing I'm wondering about. Previously it seems like it would collect() each time anyway?
Just wondering if that's actually simpler, to avoid keeping a reference to the whole data set, or whether that defeats a purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collect() is still intended to be called once logically. The following is the reason why there exists two collect().
When useIncrementalCollect=false, collect() is called at line 244 once and resultList will not be None.
However, if users executes a query with useIncrementalCollect=true and they changes their mind to turn off as useIncrementalCollect=false. The next getNextRowSet(FetchOrientation.FETCH_FIRST) should check resultList and fill that by calling collect() once in line 123.
|
Test build #70771 has finished for PR 16440 at commit
|
srowen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I think that makes sense. @alicegugu @ericl @rxin do you have any comments?
|
Thank you again, @srowen . Hi, @alicegugu , @ericl , @rxin . Could you give me some opinion about this? |
ericl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, but I think some comments would be helpful.
| } | ||
|
|
||
| private def useIncrementalCollect: Boolean = { | ||
| sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document this configuration flag in SQLConf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. I'll register this configuration into SQLConf explicitly.
| with Logging { | ||
|
|
||
| private var result: DataFrame = _ | ||
| private var resultList: Option[Array[SparkRow]] = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we document these two fields, e.g. we cache the returned rows in resultList in case the user wants to use FETCH_FIRST. This is only used when incremental collect is set to false, otherwise FETCH_FIRST will trigger re-execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! Thank you for review, @ericl .
|
The PR is updated like the followings.
|
|
Test build #70942 has finished for PR 16440 at commit
|
| .stringConf | ||
| .createOptional | ||
|
|
||
| val THRIFTSERVER_INCREMENTAL_COLLECT = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes this option more "public"; I see some other options here marked as .internal(). I don't know whether this is meant to be further exposed. It might be more conservative to make it internal for the moment? but yes seems sensible to make a config key constant like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. I'll make that internal. According to the current usage. internal seems to be better.
|
Test build #70989 has finished for PR 16440 at commit
|
|
Thank you for approving, @srowen . |
|
Hi, @srowen . |
|
Thank you for merging, @srowen ! |
…lect` in Thrift Server ## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <[email protected]> Closes #16440 from dongjoon-hyun/SPARK-18857. (cherry picked from commit a2c6adc) Signed-off-by: Sean Owen <[email protected]>
…lect` in Thrift Server ## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <[email protected]> Closes #16440 from dongjoon-hyun/SPARK-18857. (cherry picked from commit a2c6adc) Signed-off-by: Sean Owen <[email protected]>
|
Merged to 2.1/2.0 as well. I agree, it's a clean fix for a non-trivial problem. |
|
Thank you so much, @srowen ! |
…lect` in Thrift Server ## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <[email protected]> Closes apache#16440 from dongjoon-hyun/SPARK-18857.
…lect` in Thrift Server ## What changes were proposed in this pull request? To support `FETCH_FIRST`, SPARK-16563 used Scala `Iterator.duplicate`. However, Scala `Iterator.duplicate` uses a **queue to buffer all items between both iterators**, this causes GC and hangs for queries with large number of rows. We should not use this, especially for `spark.sql.thriftServer.incrementalCollect`. https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300 ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <[email protected]> Closes apache#16440 from dongjoon-hyun/SPARK-18857.
What changes were proposed in this pull request?
To support
FETCH_FIRST, SPARK-16563 used ScalaIterator.duplicate. However,Scala
Iterator.duplicateuses a queue to buffer all items between both iterators,this causes GC and hangs for queries with large number of rows. We should not use this,
especially for
spark.sql.thriftServer.incrementalCollect.https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/Iterator.scala#L1262-L1300
How was this patch tested?
Pass the existing tests.