Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// scalastyle:off println
package org.apache.spark.examples.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SaveMode, SparkSession}

// One method for defining the schema of an RDD is to make a case class with the desired column
Expand All @@ -27,14 +26,12 @@ case class Record(key: Int, value: String)

object RDDRelation {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("RDDRelation")
val sc = new SparkContext(sparkConf)
val spark = new SparkSession(sc)
val spark = SparkSession.builder.appName("RDDRelation").getOrCreate()

// Importing the SparkSession gives access to all the SQL functions and implicit conversions.
import spark.implicits._

val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()
val df = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
df.registerTempTable("records")
Expand Down Expand Up @@ -70,7 +67,7 @@ object RDDRelation {
parquetFile.registerTempTable("parquetFile")
spark.sql("SELECT * FROM parquetFile").collect().foreach(println)

sc.stop()
spark.stop()
}
}
// scalastyle:on println
Original file line number Diff line number Diff line change
Expand Up @@ -1030,10 +1030,10 @@ class SparkILoop(
def createSparkSession(): SparkSession = {
if (SparkSession.hiveClassesArePresent) {
logInfo("Creating Spark session with Hive support")
SparkSession.withHiveSupport(sparkContext)
SparkSession.builder.enableHiveSupport().getOrCreate()
} else {
logInfo("Creating Spark session")
new SparkSession(sparkContext)
SparkSession.builder.getOrCreate()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ object Main extends Logging {

def createSparkSession(): SparkSession = {
if (SparkSession.hiveClassesArePresent) {
sparkSession = SparkSession.withHiveSupport(sparkContext)
sparkSession = SparkSession.builder.enableHiveSupport().getOrCreate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not identical. The previous one will create a new spark session anyway, but the new one may not create a new spark session if sparkContext already exists. Then we can't guarantee the returned spark session supports hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

logInfo("Created Spark session with Hive support")
} else {
sparkSession = new SparkSession(sparkContext)
sparkSession = SparkSession.builder.getOrCreate()
logInfo("Created Spark session")
}
sparkSession
Expand Down
14 changes: 12 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.apache.spark.util.Utils
* {{{
* SparkSession.builder()
* .master("local")
* .appName("Word Count")
* .config("spark.some.config.option", "some-value").
* .getOrCreate()
* }}}
Expand All @@ -63,7 +64,7 @@ class SparkSession private(
@transient private val existingSharedState: Option[SharedState])
extends Serializable with Logging { self =>

def this(sc: SparkContext) {
private[sql] def this(sc: SparkContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually is it possible to just make this private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use this in SQLContext

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should change that and have sqlcontext always getting it from sparksession tiself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think the problem is that in getOrCreate right now we go the other way, i.e. getting the session from the context

this(sc, None)
}

Expand Down Expand Up @@ -573,7 +574,7 @@ class SparkSession private(
* common Scala objects into [[DataFrame]]s.
*
* {{{
* val sparkSession = new SparkSession(sc)
* val sparkSession = SparkSession.builder.getOrCreate()
* import sparkSession.implicits._
* }}}
*
Expand All @@ -586,6 +587,15 @@ class SparkSession private(
}
// scalastyle:on

/**
* Stop the underlying [[SparkContext]].
*
* @since 2.0.0
*/
def stop(): Unit = {
sparkContext.stop()
}

protected[sql] def parseSql(sql: String): LogicalPlan = {
sessionState.sqlParser.parsePlan(sql)
}
Expand Down