Skip to content

Commit 0903a18

Browse files
dongjoon-hyunAndrew Or
authored andcommitted
[SPARK-15084][PYTHON][SQL] Use builder pattern to create SparkSession in PySpark.
## What changes were proposed in this pull request? This is a python port of corresponding Scala builder pattern code. `sql.py` is modified as a target example case. ## How was this patch tested? Manual. Author: Dongjoon Hyun <[email protected]> Closes #12860 from dongjoon-hyun/SPARK-15084.
1 parent c1839c9 commit 0903a18

File tree

2 files changed

+105
-21
lines changed

2 files changed

+105
-21
lines changed

examples/src/main/python/sql.py

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,28 @@
2020
import os
2121
import sys
2222

23-
from pyspark import SparkContext
24-
from pyspark.sql import SQLContext
23+
from pyspark.sql import SparkSession
2524
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
2625

2726

2827
if __name__ == "__main__":
29-
sc = SparkContext(appName="PythonSQL")
30-
sqlContext = SQLContext(sc)
28+
spark = SparkSession.builder.appName("PythonSQL").getOrCreate()
3129

32-
# RDD is created from a list of rows
33-
some_rdd = sc.parallelize([Row(name="John", age=19),
34-
Row(name="Smith", age=23),
35-
Row(name="Sarah", age=18)])
36-
# Infer schema from the first row, create a DataFrame and print the schema
37-
some_df = sqlContext.createDataFrame(some_rdd)
30+
# A list of Rows. Infer schema from the first row, create a DataFrame and print the schema
31+
rows = [Row(name="John", age=19), Row(name="Smith", age=23), Row(name="Sarah", age=18)]
32+
some_df = spark.createDataFrame(rows)
3833
some_df.printSchema()
3934

40-
# Another RDD is created from a list of tuples
41-
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
35+
# A list of tuples
36+
tuples = [("John", 19), ("Smith", 23), ("Sarah", 18)]
4237
# Schema with two fields - person_name and person_age
4338
schema = StructType([StructField("person_name", StringType(), False),
4439
StructField("person_age", IntegerType(), False)])
4540
# Create a DataFrame by applying the schema to the RDD and print the schema
46-
another_df = sqlContext.createDataFrame(another_rdd, schema)
41+
another_df = spark.createDataFrame(tuples, schema)
4742
another_df.printSchema()
4843
# root
49-
# |-- age: integer (nullable = true)
44+
# |-- age: long (nullable = true)
5045
# |-- name: string (nullable = true)
5146

5247
# A JSON dataset is pointed to by path.
@@ -57,24 +52,24 @@
5752
else:
5853
path = sys.argv[1]
5954
# Create a DataFrame from the file(s) pointed to by path
60-
people = sqlContext.jsonFile(path)
55+
people = spark.read.json(path)
6156
# root
6257
# |-- person_name: string (nullable = false)
6358
# |-- person_age: integer (nullable = false)
6459

6560
# The inferred schema can be visualized using the printSchema() method.
6661
people.printSchema()
6762
# root
68-
# |-- age: IntegerType
69-
# |-- name: StringType
63+
# |-- age: long (nullable = true)
64+
# |-- name: string (nullable = true)
7065

7166
# Register this DataFrame as a table.
72-
people.registerAsTable("people")
67+
people.registerTempTable("people")
7368

7469
# SQL statements can be run by using the sql methods provided by sqlContext
75-
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
70+
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
7671

7772
for each in teenagers.collect():
7873
print(each[0])
7974

80-
sc.stop()
75+
spark.stop()

python/pyspark/sql/session.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import sys
2020
import warnings
2121
from functools import reduce
22+
from threading import RLock
2223

2324
if sys.version >= '3':
2425
basestring = unicode = str
@@ -58,16 +59,98 @@ def toDF(self, schema=None, sampleRatio=None):
5859

5960

6061
class SparkSession(object):
61-
"""Main entry point for Spark SQL functionality.
62+
"""The entry point to programming Spark with the Dataset and DataFrame API.
6263
6364
A SparkSession can be used create :class:`DataFrame`, register :class:`DataFrame` as
6465
tables, execute SQL over tables, cache tables, and read parquet files.
66+
To create a SparkSession, use the following builder pattern:
67+
68+
>>> spark = SparkSession.builder \
69+
.master("local") \
70+
.appName("Word Count") \
71+
.config("spark.some.config.option", "some-value") \
72+
.getOrCreate()
6573
6674
:param sparkContext: The :class:`SparkContext` backing this SparkSession.
6775
:param jsparkSession: An optional JVM Scala SparkSession. If set, we do not instantiate a new
6876
SparkSession in the JVM, instead we make all calls to this object.
6977
"""
7078

79+
class Builder(object):
80+
"""Builder for :class:`SparkSession`.
81+
"""
82+
83+
_lock = RLock()
84+
_options = {}
85+
86+
@since(2.0)
87+
def config(self, key=None, value=None, conf=None):
88+
"""Sets a config option. Options set using this method are automatically propagated to
89+
both :class:`SparkConf` and :class:`SparkSession`'s own configuration.
90+
91+
For an existing SparkConf, use `conf` parameter.
92+
>>> from pyspark.conf import SparkConf
93+
>>> SparkSession.builder.config(conf=SparkConf())
94+
<pyspark.sql.session...
95+
96+
For a (key, value) pair, you can omit parameter names.
97+
>>> SparkSession.builder.config("spark.some.config.option", "some-value")
98+
<pyspark.sql.session...
99+
100+
:param key: a key name string for configuration property
101+
:param value: a value for configuration property
102+
:param conf: an instance of :class:`SparkConf`
103+
"""
104+
with self._lock:
105+
if conf is None:
106+
self._options[key] = str(value)
107+
else:
108+
for (k, v) in conf.getAll():
109+
self._options[k] = v
110+
return self
111+
112+
@since(2.0)
113+
def master(self, master):
114+
"""Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]"
115+
to run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone
116+
cluster.
117+
118+
:param master: a url for spark master
119+
"""
120+
return self.config("spark.master", master)
121+
122+
@since(2.0)
123+
def appName(self, name):
124+
"""Sets a name for the application, which will be shown in the Spark web UI.
125+
126+
:param name: an application name
127+
"""
128+
return self.config("spark.app.name", name)
129+
130+
@since(2.0)
131+
def enableHiveSupport(self):
132+
"""Enables Hive support, including connectivity to a persistent Hive metastore, support
133+
for Hive serdes, and Hive user-defined functions.
134+
"""
135+
return self.config("spark.sql.catalogImplementation", "hive")
136+
137+
@since(2.0)
138+
def getOrCreate(self):
139+
"""Gets an existing :class:`SparkSession` or, if there is no existing one, creates a new
140+
one based on the options set in this builder.
141+
"""
142+
with self._lock:
143+
from pyspark.conf import SparkConf
144+
from pyspark.context import SparkContext
145+
from pyspark.sql.context import SQLContext
146+
sparkConf = SparkConf()
147+
for key, value in self._options.items():
148+
sparkConf.set(key, value)
149+
sparkContext = SparkContext.getOrCreate(sparkConf)
150+
return SQLContext.getOrCreate(sparkContext).sparkSession
151+
152+
builder = Builder()
153+
71154
_instantiatedContext = None
72155

73156
@ignore_unicode_prefix
@@ -445,6 +528,12 @@ def read(self):
445528
"""
446529
return DataFrameReader(self._wrapped)
447530

531+
@since(2.0)
532+
def stop(self):
533+
"""Stop the underlying :class:`SparkContext`.
534+
"""
535+
self._sc.stop()
536+
448537

449538
def _test():
450539
import os

0 commit comments

Comments
 (0)