From 17e0703ab0a1cf082b6b7a227c3d9631903de3a7 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Tue, 3 May 2016 04:26:31 +0530 Subject: [PATCH 1/3] [SPARK-15072][SQL][REPL][EXAMPLES] Remove SparkSession.withHiveSupport --- .../spark/examples/sql/hive/HiveFromSpark.scala | 2 +- .../scala/org/apache/spark/repl/SparkILoop.scala | 2 +- .../src/main/scala/org/apache/spark/repl/Main.scala | 2 +- .../scala/org/apache/spark/sql/SparkSession.scala | 13 ------------- .../spark/sql/hive/thriftserver/SparkSQLEnv.scala | 2 +- .../resources/regression-test-SPARK-8489/Main.scala | 2 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 7 files changed, 6 insertions(+), 19 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index ff33091621c1..c695e90ce893 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -42,7 +42,7 @@ object HiveFromSpark { // using HiveQL. Users who do not have an existing Hive deployment can still create a // HiveContext. When not configured by the hive-site.xml, the context automatically // creates metastore_db and warehouse in the current directory. - val sparkSession = SparkSession.withHiveSupport(sc) + val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() import sparkSession.implicits._ import sparkSession.sql diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index b1e95d8fdb60..586a2d2716e3 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1023,7 +1023,7 @@ class SparkILoop( val builder = SparkSession.builder.config(conf) val sparkSession = if (SparkSession.hiveClassesArePresent) { logInfo("Creating Spark session with Hive support") - builder.enableHiveSupport().getOrCreate() + SparkSession.builder.enableHiveSupport().getOrCreate() } else { logInfo("Creating Spark session") builder.getOrCreate() diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 005edda2bee7..d7f6906d85f3 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -89,7 +89,7 @@ object Main extends Logging { val builder = SparkSession.builder.config(conf) if (SparkSession.hiveClassesArePresent) { - sparkSession = builder.enableHiveSupport().getOrCreate() + sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { sparkSession = builder.getOrCreate() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index aa7c335c53d2..9ed3756628c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -816,17 +816,4 @@ object SparkSession { } } - /** - * Create a new [[SparkSession]] with a catalog backed by Hive. - */ - def withHiveSupport(sc: SparkContext): SparkSession = { - if (hiveClassesArePresent) { - sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive") - new SparkSession(sc) - } else { - throw new IllegalArgumentException( - "Unable to instantiate SparkSession with Hive support because Hive classes are not found.") - } - } - } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 665a44e51a0c..16709930eb27 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -55,7 +55,7 @@ private[hive] object SparkSQLEnv extends Logging { maybeKryoReferenceTracking.getOrElse("false")) sparkContext = new SparkContext(sparkConf) - sqlContext = SparkSession.withHiveSupport(sparkContext).wrapped + sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().wrapped val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala index 10a017df831e..517995d5b8fa 100644 --- a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala +++ b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala @@ -34,7 +34,7 @@ object Main { // scalastyle:off println println("Running regression test for SPARK-8489.") val sc = new SparkContext("local", "testing") - val sparkSession = SparkSession.withHiveSupport(sc) + val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() // This line should not throw scala.reflect.internal.MissingRequirementError. // See SPARK-8470 for more detail. val df = sparkSession.createDataFrame(Seq(MyCoolClass("1", "2", "3"))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 77a6a94a6719..7ff4903ac833 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -291,7 +291,7 @@ object SetWarehouseLocationTest extends Logging { conf.set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString) val sc = new SparkContext(conf) - val sparkSession = SparkSession.withHiveSupport(sc) + val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() val catalog = sparkSession.sessionState.catalog sparkSession.sql("drop table if exists testLocation") From 7ad494d524f5a920b1719acec183f5c49c4729c3 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 5 May 2016 11:41:51 +0530 Subject: [PATCH 2/3] Address comments --- .../apache/spark/examples/sql/hive/HiveFromSpark.scala | 5 +++-- .../main/scala/org/apache/spark/repl/SparkILoop.scala | 2 +- .../src/main/scala/org/apache/spark/repl/Main.scala | 2 +- .../spark/sql/hive/thriftserver/SparkSQLEnv.scala | 10 ++++++---- .../resources/regression-test-SPARK-8489/Main.scala | 10 ++++++++-- .../apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 5 +++-- 6 files changed, 22 insertions(+), 12 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index c695e90ce893..4f8f76f34bf5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -36,13 +36,14 @@ object HiveFromSpark { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HiveFromSpark") - val sc = new SparkContext(sparkConf) // A hive context adds support for finding tables in the MetaStore and writing queries // using HiveQL. Users who do not have an existing Hive deployment can still create a // HiveContext. When not configured by the hive-site.xml, the context automatically // creates metastore_db and warehouse in the current directory. - val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() + val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() + val sc = sparkSession.sparkContext + import sparkSession.implicits._ import sparkSession.sql diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 586a2d2716e3..b1e95d8fdb60 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -1023,7 +1023,7 @@ class SparkILoop( val builder = SparkSession.builder.config(conf) val sparkSession = if (SparkSession.hiveClassesArePresent) { logInfo("Creating Spark session with Hive support") - SparkSession.builder.enableHiveSupport().getOrCreate() + builder.enableHiveSupport().getOrCreate() } else { logInfo("Creating Spark session") builder.getOrCreate() diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index d7f6906d85f3..005edda2bee7 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -89,7 +89,7 @@ object Main extends Logging { val builder = SparkSession.builder.config(conf) if (SparkSession.hiveClassesArePresent) { - sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() + sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { sparkSession = builder.getOrCreate() diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 16709930eb27..8de223f444f7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -54,13 +54,15 @@ private[hive] object SparkSQLEnv extends Logging { "spark.kryo.referenceTracking", maybeKryoReferenceTracking.getOrElse("false")) - sparkContext = new SparkContext(sparkConf) - sqlContext = SparkSession.builder.enableHiveSupport().getOrCreate().wrapped - val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() + sparkContext = sparkSession.sparkContext + sqlContext = sparkSession.wrapped + + val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) + sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) } } diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala index 517995d5b8fa..a3e50d7198b9 100644 --- a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala +++ b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.SparkSession /** @@ -33,8 +33,14 @@ object Main { def main(args: Array[String]) { // scalastyle:off println println("Running regression test for SPARK-8489.") - val sc = new SparkContext("local", "testing") + + val conf = new SparkConf() + .setMaster("local") + .setAppName("testing") + val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() + val sc = sparkSession.sparkContext + // This line should not throw scala.reflect.internal.MissingRequirementError. // See SPARK-8470 for more detail. val df = sparkSession.createDataFrame(Seq(MyCoolClass("1", "2", "3"))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 7ff4903ac833..4cdf22a7f85a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -290,8 +290,9 @@ object SetWarehouseLocationTest extends Logging { conf.set("spark.sql.warehouse.dir", warehouseLocation.toString) conf.set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString) - val sc = new SparkContext(conf) - val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() + val sparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate() + val sc = sparkSession.sparkContext + val catalog = sparkSession.sessionState.catalog sparkSession.sql("drop table if exists testLocation") From 5e464ad766535061d568bc10d6904d1c96857ef1 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 5 May 2016 23:16:14 +0530 Subject: [PATCH 3/3] address comments --- .../spark/examples/sql/hive/HiveFromSpark.scala | 13 ++++++++----- .../resources/regression-test-SPARK-8489/Main.scala | 12 +++--------- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 6 ++++-- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala index 4f8f76f34bf5..a15cf5ded0e7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -41,11 +41,14 @@ object HiveFromSpark { // using HiveQL. Users who do not have an existing Hive deployment can still create a // HiveContext. When not configured by the hive-site.xml, the context automatically // creates metastore_db and warehouse in the current directory. - val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate() - val sc = sparkSession.sparkContext + val spark = SparkSession.builder + .config(sparkConf) + .enableHiveSupport() + .getOrCreate() + val sc = spark.sparkContext - import sparkSession.implicits._ - import sparkSession.sql + import spark.implicits._ + import spark.sql sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src") @@ -75,7 +78,7 @@ object HiveFromSpark { println("Result of SELECT *:") sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println) - sc.stop() + spark.stop() } } // scalastyle:on println diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala index a3e50d7198b9..10a017df831e 100644 --- a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala +++ b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession /** @@ -33,14 +33,8 @@ object Main { def main(args: Array[String]) { // scalastyle:off println println("Running regression test for SPARK-8489.") - - val conf = new SparkConf() - .setMaster("local") - .setAppName("testing") - - val sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate() - val sc = sparkSession.sparkContext - + val sc = new SparkContext("local", "testing") + val sparkSession = SparkSession.withHiveSupport(sc) // This line should not throw scala.reflect.internal.MissingRequirementError. // See SPARK-8470 for more detail. val df = sparkSession.createDataFrame(Seq(MyCoolClass("1", "2", "3"))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 4cdf22a7f85a..a32001179949 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -290,8 +290,10 @@ object SetWarehouseLocationTest extends Logging { conf.set("spark.sql.warehouse.dir", warehouseLocation.toString) conf.set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString) - val sparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate() - val sc = sparkSession.sparkContext + val sparkSession = SparkSession.builder + .config(conf) + .enableHiveSupport() + .getOrCreate() val catalog = sparkSession.sessionState.catalog