From 699644c692472e5b78baa56a1a6c44d8d174e70e Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 22 Feb 2016 15:27:29 -0800 Subject: [PATCH 1/7] [SPARK-12546][SQL] Change default number of open parquet files A common problem that users encounter with Spark 1.6.0 is that writing to a partitioned parquet table OOMs. The root cause is that parquet allocates a significant amount of memory that is not accounted for by our own mechanisms. As a workaround, we can ensure that only a single file is open per task unless the user explicitly asks for more. Author: Michael Armbrust Closes #11308 from marmbrus/parquetWriteOOM. (cherry picked from commit 173aa949c309ff7a7a03e9d762b9108542219a95) Signed-off-by: Michael Armbrust --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 58adf64e49869..6cc680a1b2286 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -396,7 +396,7 @@ private[spark] object SQLConf { val PARTITION_MAX_FILES = intConf("spark.sql.sources.maxConcurrentWrites", - defaultValue = Some(5), + defaultValue = Some(1), doc = "The maximum number of concurrent files to open before falling back on sorting when " + "writing out files using dynamic partitioning.") From 85e6a2205d4549c81edbc2238fd15659120cee78 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 22 Feb 2016 17:42:30 -0800 Subject: [PATCH 2/7] [SPARK-13298][CORE][UI] Escape "label" to avoid DAG being broken by some special character ## What changes were proposed in this pull request? When there are some special characters (e.g., `"`, `\`) in `label`, DAG will be broken. This patch just escapes `label` to avoid DAG being broken by some special characters ## How was the this patch tested? Jenkins tests Author: Shixiong Zhu Closes #11309 from zsxwing/SPARK-13298. (cherry picked from commit a11b3995190cb4a983adcc8667f7b316cce18d24) Signed-off-by: Andrew Or --- .../org/apache/spark/ui/scope/RDDOperationGraph.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index e9c8a8e299cd7..54ba03e61868c 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -20,10 +20,11 @@ package org.apache.spark.ui.scope import scala.collection.mutable import scala.collection.mutable.{StringBuilder, ListBuffer} +import org.apache.commons.lang3.StringEscapeUtils + import org.apache.spark.Logging import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.CallSite /** * A representation of a generic cluster graph used for storing information on RDD operations. @@ -179,7 +180,7 @@ private[ui] object RDDOperationGraph extends Logging { /** Return the dot representation of a node in an RDDOperationGraph. */ private def makeDotNode(node: RDDOperationNode): String = { val label = s"${node.name} [${node.id}]\n${node.callsite}" - s"""${node.id} [label="$label"]""" + s"""${node.id} [label="${StringEscapeUtils.escapeJava(label)}"]""" } /** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */ @@ -188,7 +189,7 @@ private[ui] object RDDOperationGraph extends Logging { cluster: RDDOperationCluster, indent: String): Unit = { subgraph.append(indent).append(s"subgraph cluster${cluster.id} {\n") - subgraph.append(indent).append(s""" label="${cluster.name}";\n""") + .append(indent).append(s""" label="${StringEscapeUtils.escapeJava(cluster.name)}";\n""") cluster.childNodes.foreach { node => subgraph.append(indent).append(s" ${makeDotNode(node)};\n") } From f7898f9e2df131fa78200f6034508e74a78c2a44 Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Mon, 22 Feb 2016 18:13:32 -0800 Subject: [PATCH 3/7] [SPARK-11624][SPARK-11972][SQL] fix commands that need hive to exec In SparkSQLCLI, we have created a `CliSessionState`, but then we call `SparkSQLEnv.init()`, which will start another `SessionState`. This would lead to exception because `processCmd` need to get the `CliSessionState` instance by calling `SessionState.get()`, but the return value would be a instance of `SessionState`. See the exception below. spark-sql> !echo "test"; Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.hive.ql.session.SessionState cannot be cast to org.apache.hadoop.hive.cli.CliSessionState at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:112) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:301) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:242) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:691) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Author: Daoyuan Wang Closes #9589 from adrian-wang/clicommand. (cherry picked from commit 5d80fac58f837933b5359a8057676f45539e53af) Signed-off-by: Michael Armbrust Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 7 ++- .../sql/hive/thriftserver/CliSuite.scala | 5 ++ sql/hive/pom.xml | 6 +++ .../spark/sql/hive/client/ClientWrapper.scala | 53 +++++++++++-------- 4 files changed, 48 insertions(+), 23 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 6419002a2aa89..4be9bd936553b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -276,8 +276,11 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { val tokens: Array[String] = cmd_trimmed.split("\\s+") val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() if (cmd_lower.equals("quit") || - cmd_lower.equals("exit") || - tokens(0).toLowerCase(Locale.ENGLISH).equals("source") || + cmd_lower.equals("exit")) { + sessionState.close() + System.exit(0) + } + if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") || cmd_trimmed.startsWith("!") || tokens(0).toLowerCase.equals("list") || isRemoteMode) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index fcf039916913a..58ebc000031d0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -234,4 +234,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { -> "Error in query: Table not found: nonexistent_table;" ) } + + test("SPARK-11624 Spark SQL CLI should set sessionState only once") { + runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))( + "" -> "This is a test for Spark-11624") + } } diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index d96f3e2b9f62b..57c867f829ff5 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -72,6 +72,12 @@ protobuf-java ${protobuf.version} +--> + + ${hive.group} + hive-cli + +