Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@ object HiveFromSpark {

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveFromSpark")
val sc = new SparkContext(sparkConf)

// A hive context adds support for finding tables in the MetaStore and writing queries
// using HiveQL. Users who do not have an existing Hive deployment can still create a
// HiveContext. When not configured by the hive-site.xml, the context automatically
// creates metastore_db and warehouse in the current directory.
val sparkSession = SparkSession.withHiveSupport(sc)
import sparkSession.implicits._
import sparkSession.sql
val spark = SparkSession.builder
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql(s"LOAD DATA LOCAL INPATH '${kv1File.getAbsolutePath}' INTO TABLE src")
Expand Down Expand Up @@ -74,7 +78,7 @@ object HiveFromSpark {
println("Result of SELECT *:")
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)

sc.stop()
spark.stop()
}
}
// scalastyle:on println
13 changes: 0 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -816,17 +816,4 @@ object SparkSession {
}
}

/**
* Create a new [[SparkSession]] with a catalog backed by Hive.
*/
def withHiveSupport(sc: SparkContext): SparkSession = {
if (hiveClassesArePresent) {
sc.conf.set(CATALOG_IMPLEMENTATION.key, "hive")
new SparkSession(sc)
} else {
throw new IllegalArgumentException(
"Unable to instantiate SparkSession with Hive support because Hive classes are not found.")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ private[hive] object SparkSQLEnv extends Logging {
"spark.kryo.referenceTracking",
maybeKryoReferenceTracking.getOrElse("false"))

sparkContext = new SparkContext(sparkConf)
sqlContext = SparkSession.withHiveSupport(sparkContext).wrapped
val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
val sparkSession = SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate()
sparkContext = sparkSession.sparkContext
sqlContext = sparkSession.wrapped

val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
sparkSession.conf.set("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,11 @@ object SetWarehouseLocationTest extends Logging {
conf.set("spark.sql.warehouse.dir", warehouseLocation.toString)
conf.set("hive.metastore.warehouse.dir", hiveWarehouseLocation.toString)

val sc = new SparkContext(conf)
val sparkSession = SparkSession.withHiveSupport(sc)
val sparkSession = SparkSession.builder
.config(conf)
.enableHiveSupport()
.getOrCreate()

val catalog = sparkSession.sessionState.catalog

sparkSession.sql("drop table if exists testLocation")
Expand Down