Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 7 additions & 19 deletions python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,13 @@
SparkContext._ensure_initialized()

try:
# Try to access HiveConf, it will raise exception if Hive is not added
conf = SparkConf()
if conf.get('spark.sql.catalogImplementation', 'hive').lower() == 'hive':
SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
spark = SparkSession.builder\
.enableHiveSupport()\
.getOrCreate()
else:
spark = SparkSession.builder.getOrCreate()
except py4j.protocol.Py4JError:
if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
"please make sure you build spark with hive")
spark = SparkSession.builder.getOrCreate()
except TypeError:
if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
"please make sure you build spark with hive")
spark = SparkSession.builder.getOrCreate()
spark = SparkSession._create_shell_session()
except Exception:
import sys
import traceback
warnings.warn("Failed to initialize Spark session.")
traceback.print_exc(file=sys.stderr)
sys.exit(1)

sc = spark.sparkContext
sql = spark.sql
Expand Down
34 changes: 34 additions & 0 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,40 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone):
df._schema = schema
return df

@staticmethod
def _create_shell_session():
"""
Initialize a SparkSession for a pyspark shell session. This is called from shell.py
to make error handling simpler without needing to declare local variables in that
script, which would expose those to users.
"""
import py4j
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
try:
# Try to access HiveConf, it will raise exception if Hive is not added
conf = SparkConf()
if conf.get('spark.sql.catalogImplementation', 'hive').lower() == 'hive':
SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
return SparkSession.builder\
.enableHiveSupport()\
.getOrCreate()
else:
return SparkSession.builder.getOrCreate()
except py4j.protocol.Py4JError:
if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
"please make sure you build spark with hive")

try:
return SparkSession.builder.getOrCreate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the call flow seems to be changed here? I think this line is meant to be inside the handling of Py4JError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional to avoid the python exception being unreadable (see commit description).

The actual flow logic is the same.

except TypeError:
if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
"please make sure you build spark with hive")

return SparkSession.builder.getOrCreate()

@since(2.0)
@ignore_unicode_prefix
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
Expand Down
72 changes: 40 additions & 32 deletions repl/src/main/scala/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object Main extends Logging {
var interp: SparkILoop = _

private var hasErrors = false
private var isShellSession = false

private def scalaOptionError(msg: String): Unit = {
hasErrors = true
Expand All @@ -53,6 +54,7 @@ object Main extends Logging {
}

def main(args: Array[String]) {
isShellSession = true
doMain(args, new SparkILoop)
}

Expand All @@ -79,44 +81,50 @@ object Main extends Logging {
}

def createSparkSession(): SparkSession = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
// initialization in certain cases, there's an initialization order issue that prevents
// this from being set after SparkContext is instantiated.
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
try {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
// initialization in certain cases, there's an initialization order issue that prevents
// this from being set after SparkContext is instantiated.
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}

val builder = SparkSession.builder.config(conf)
if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
if (SparkSession.hiveClassesArePresent) {
// In the case that the property is not set at all, builder's config
// does not have this value set to 'hive' yet. The original default
// behavior is that when there are hive classes, we use hive catalog.
sparkSession = builder.enableHiveSupport().getOrCreate()
logInfo("Created Spark session with Hive support")
val builder = SparkSession.builder.config(conf)
if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
if (SparkSession.hiveClassesArePresent) {
// In the case that the property is not set at all, builder's config
// does not have this value set to 'hive' yet. The original default
// behavior is that when there are hive classes, we use hive catalog.
sparkSession = builder.enableHiveSupport().getOrCreate()
logInfo("Created Spark session with Hive support")
} else {
// Need to change it back to 'in-memory' if no hive classes are found
// in the case that the property is set to hive in spark-defaults.conf
builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
} else {
// Need to change it back to 'in-memory' if no hive classes are found
// in the case that the property is set to hive in spark-defaults.conf
builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
// In the case that the property is set but not to 'hive', the internal
// default is 'in-memory'. So the sparkSession will use in-memory catalog.
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
} else {
// In the case that the property is set but not to 'hive', the internal
// default is 'in-memory'. So the sparkSession will use in-memory catalog.
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
sparkContext = sparkSession.sparkContext
sparkSession
} catch {
case e: Exception if isShellSession =>
logError("Failed to initialize Spark session.", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin, seem e.printStackTrace() is missing .. ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception is being printed as part of the logError.

sys.exit(1)
}
sparkContext = sparkSession.sparkContext
sparkSession
}

}