From 623abde9e4d613a777c4f3a199e3fc104963f631 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 8 Aug 2014 16:44:30 +0800 Subject: [PATCH 1/3] Added option to handle incremental collection, disabled by default --- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 10 ++++++++++ .../thriftserver/server/SparkSQLOperationManager.scala | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 0fd7aaaa36eb8..89c2c515099eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -30,6 +30,7 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" val CODEGEN_ENABLED = "spark.sql.codegen" val DIALECT = "spark.sql.dialect" + val INCREMENTAL_COLLECT_ENABLED = "spark.sql.thriftServer.incrementalCollect" object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" @@ -104,6 +105,15 @@ trait SQLConf { private[spark] def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong + /** + * When set to true, the Hive Thrift server will collect SQL result set incrementally (one + * partition at a time) to decrease the risk of OOM on driver side. This can be useful when the + * result set is potentially large. The cost is that *the last* stage of the RDD DAG generated + * from the SQL query plan is executed sequentially, and hurts performance. + */ + private[spark] def incrementalCollectEnabled: Boolean = + getConf(INCREMENTAL_COLLECT_ENABLED, "false").toBoolean + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index dee092159dd4c..2b72540c1bad4 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -132,7 +132,14 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage logDebug(result.queryExecution.toString()) val groupId = round(random * 1000000).toString hiveContext.sparkContext.setJobGroup(groupId, statement) - iter = result.queryExecution.toRdd.toLocalIterator + iter = { + val resultRdd = result.queryExecution.toRdd + if (hiveContext.incrementalCollectEnabled) { + resultRdd.toLocalIterator + } else { + resultRdd.collect().iterator + } + } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray setHasResultSet(true) } catch { From 43ce3aa5fbde8eda05013336682e02bd9c7bacc1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 8 Aug 2014 17:00:43 +0800 Subject: [PATCH 2/3] Changed incremental collect option name --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 89c2c515099eb..663bdc22630cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -30,7 +30,7 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" val CODEGEN_ENABLED = "spark.sql.codegen" val DIALECT = "spark.sql.dialect" - val INCREMENTAL_COLLECT_ENABLED = "spark.sql.thriftServer.incrementalCollect" + val INCREMENTAL_COLLECT_ENABLED = "spark.sql.incrementalCollect" object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" From cb3ea4548b1f02df243cd407116948bd7fe31f3b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 10 Aug 2014 19:56:29 +0800 Subject: [PATCH 3/3] Moved incremental collection option to Thrift server --- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 10 ---------- .../thriftserver/server/SparkSQLOperationManager.scala | 4 +++- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 663bdc22630cf..0fd7aaaa36eb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -30,7 +30,6 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" val CODEGEN_ENABLED = "spark.sql.codegen" val DIALECT = "spark.sql.dialect" - val INCREMENTAL_COLLECT_ENABLED = "spark.sql.incrementalCollect" object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" @@ -105,15 +104,6 @@ trait SQLConf { private[spark] def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong - /** - * When set to true, the Hive Thrift server will collect SQL result set incrementally (one - * partition at a time) to decrease the risk of OOM on driver side. This can be useful when the - * result set is potentially large. The cost is that *the last* stage of the RDD DAG generated - * from the SQL query plan is executed sequentially, and hurts performance. - */ - private[spark] def incrementalCollectEnabled: Boolean = - getConf(INCREMENTAL_COLLECT_ENABLED, "false").toBoolean - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 2b72540c1bad4..f192f490ac3d0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -134,7 +134,9 @@ class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManage hiveContext.sparkContext.setJobGroup(groupId, statement) iter = { val resultRdd = result.queryExecution.toRdd - if (hiveContext.incrementalCollectEnabled) { + val useIncrementalCollect = + hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + if (useIncrementalCollect) { resultRdd.toLocalIterator } else { resultRdd.collect().iterator