Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ object SQLConf {
.stringConf
.createOptional

val THRIFTSERVER_INCREMENTAL_COLLECT =
Copy link
Member

@srowen srowen Jan 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes this option more "public"; I see some other options here marked as .internal(). I don't know whether this is meant to be further exposed. It might be more conservative to make it internal for the moment? but yes seems sensible to make a config key constant like this.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I'll make that internal. According to the current usage. internal seems to be better.

SQLConfigBuilder("spark.sql.thriftServer.incrementalCollect")
.internal()
.doc("When true, enable incremental collection for execution in Thrift Server.")
.booleanConf
.createWithDefault(false)

val THRIFTSERVER_UI_STATEMENT_LIMIT =
SQLConfigBuilder("spark.sql.thriftserver.ui.retainedStatements")
.doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ private[hive] class SparkExecuteStatementOperation(
with Logging {

private var result: DataFrame = _

// We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST.
// This is only used when `spark.sql.thriftServer.incrementalCollect` is set to `false`.
// In case of `true`, this will be `None` and FETCH_FIRST will trigger re-execution.
private var resultList: Option[Array[SparkRow]] = _
Copy link
Contributor

@ericl ericl Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document these two fields, e.g. we cache the returned rows in resultList in case the user wants to use FETCH_FIRST. This is only used when incremental collect is set to false, otherwise FETCH_FIRST will trigger re-execution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Thank you for review, @ericl .


private var iter: Iterator[SparkRow] = _
private var iterHeader: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
private var statementId: String = _

Expand Down Expand Up @@ -111,9 +116,15 @@ private[hive] class SparkExecuteStatementOperation(

// Reset iter to header when fetching start from first row
if (order.equals(FetchOrientation.FETCH_FIRST)) {
val (ita, itb) = iterHeader.duplicate
iter = ita
iterHeader = itb
iter = if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
resultList = None
result.toLocalIterator.asScala
} else {
if (resultList.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this makes the implicit buffering implicit. So, if an iterator is duplicated into A and B, and all of A is consumed, then B will internally buffer everything from A so it can be replayed? and in our case, we know that A will be entirely consumed? then these are basically the same, yes.

But, does that solve the problem? this now always stores the whole result set locally. Is this avoiding a second whole copy of it?

What if you always just return result.collect().iterator here -- the problem is the re-collecting the result every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The following happens with iterator.duplicate.

So, if an iterator is duplicated into A and B, and all of A is consumed, then B will internally buffer everything from A so it can be replayed?

And, the whole result storing happens line 122 and line 245-246 for spark.sql.thriftServer.incrementalCollect=false only.

resultList = Some(result.collect())
resultList.get.iterator

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I'm asking, why is this an improvement? because in the new version, you also buffer the whole result into memory locally.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. There are two cases and this PR targets incrementalCollect=true mainly.

you also buffer the whole result into memory locally.

First of all, before SPARK-16563, FETCH_FIRST is not supported correctly because iterator can be traversed once.

  • Case 1) incrementalCollect=false
    Creating a whole result in a memory once by result.collect was the the original Spark way before SPARK-16563.
    If we can create a whole result once during the query processing, I think we can keep that for FETCH_FIRST with less side effect.
    So, I keep them. If this is not allowed, we have to go Case 2.

  • Case 2) incrementalCollect=true
    In this case, by definition, we cannot create the whole result set in a memory at any time during the query processing. There is no way to find the first row with iterator. result.toLocalIterator.asScala should be called whenever FETCH_FIRST is used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I believe I get it now. I see your approach and it makes sense.
The only real change here is that you hold on to a reference to the whole data set here rather than collect() it into memory. Maybe that's the right thing to do but that's the only thing I'm wondering about. Previously it seems like it would collect() each time anyway?

Just wondering if that's actually simpler, to avoid keeping a reference to the whole data set, or whether that defeats a purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect() is still intended to be called once logically. The following is the reason why there exists two collect().

When useIncrementalCollect=false, collect() is called at line 244 once and resultList will not be None.

However, if users executes a query with useIncrementalCollect=true and they changes their mind to turn off as useIncrementalCollect=false. The next getNextRowSet(FetchOrientation.FETCH_FIRST) should check resultList and fill that by calling collect() once in line 123.

resultList = Some(result.collect())
}
resultList.get.iterator
}
}

if (!iter.hasNext) {
Expand Down Expand Up @@ -227,17 +238,14 @@ private[hive] class SparkExecuteStatementOperation(
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
resultList = None
result.toLocalIterator.asScala
} else {
result.collect().iterator
resultList = Some(result.collect())
resultList.get.iterator
}
}
val (itra, itrb) = iter.duplicate
iterHeader = itra
iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>
Expand Down