Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev/audit-release/sbt_app_sql/src/main/scala/SqlApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object SparkSqlExample {
import sqlContext._

val people = sc.makeRDD(1 to 100, 10).map(x => Person(s"Name$x", x)).toDF()
people.registerTempTable("people")
people.createOrReplaceTempView("people")
val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
val teenagerNames = teenagers.map(t => "Name: " + t(0)).collect()
teenagerNames.foreach(println)
Expand All @@ -52,7 +52,7 @@ object SparkSqlExample {
System.exit(-1)
}
}

test(teenagerNames.size == 7, "Unexpected number of selected elements: " + teenagerNames)
println("Test succeeded")
sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Person call(String line) {

// Apply a schema to an RDD of Java Beans and register it as a table.
Dataset<Row> schemaPeople = spark.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");
schemaPeople.createOrReplaceTempView("people");

// SQL can be run over RDDs that have been registered as tables.
Dataset<Row> teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
Expand All @@ -102,7 +102,7 @@ public String call(Row row) {
Dataset<Row> parquetFile = spark.read().parquet("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
parquetFile.createOrReplaceTempView("parquetFile");
Dataset<Row> teenagers2 =
spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
Expand Down Expand Up @@ -131,7 +131,7 @@ public String call(Row row) {
// |-- name: StringType

// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");
peopleFromJsonFile.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by `spark`
Dataset<Row> teenagers3 = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
Expand Down Expand Up @@ -163,7 +163,7 @@ public String call(Row row) {
// | |-- state: StringType
// |-- name: StringType

peopleFromJsonRDD.registerTempTable("people2");
peopleFromJsonRDD.createOrReplaceTempView("people2");

Dataset<Row> peopleWithCity = spark.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public JavaRecord call(String word) {
Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class);

// Register as table
wordsDataFrame.registerTempTable("words");
wordsDataFrame.createOrReplaceTempView("words");

// Do word count on table using SQL and print it
Dataset<Row> wordCountsDataFrame =
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/python/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
# |-- name: string (nullable = true)

# Register this DataFrame as a temporary table.
people.registerTempTable("people")
people.createOrReplaceTempView("people")

# SQL statements can be run by using the sql methods provided by `spark`
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def process(time, rdd):
wordsDataFrame = spark.createDataFrame(rowRdd)

# Register as table
wordsDataFrame.registerTempTable("words")
wordsDataFrame.createOrReplaceTempView("words")

# Do word count on table using SQL and print it
wordCountsDataFrame = \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object RDDRelation {
val df = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
df.registerTempTable("records")
df.createOrReplaceTempView("records")

// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
Expand Down Expand Up @@ -67,7 +67,7 @@ object RDDRelation {
parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println)

// These files can also be registered as tables.
parquetFile.registerTempTable("parquetFile")
parquetFile.createOrReplaceTempView("parquetFile")
spark.sql("SELECT * FROM parquetFile").collect().foreach(println)

spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object SqlNetworkWordCount {
val wordsDataFrame = rdd.map(w => Record(w)).toDF()

// Register as table
wordsDataFrame.registerTempTable("words")
wordsDataFrame.createOrReplaceTempView("words")

// Do word count on table using SQL and print it
val wordCountsDataFrame =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor

/**
* SQL statement parameter. The statement is provided in string form.
*
* @group param
*/
@Since("1.6.0")
Expand All @@ -66,7 +67,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val tableName = Identifiable.randomUID(uid)
dataset.registerTempTable(tableName)
dataset.createOrReplaceTempView(tableName)
val realStatement = $(statement).replace(tableIdentifier, tableName)
dataset.sparkSession.sql(realStatement)
}
Expand All @@ -79,7 +80,7 @@ class SQLTransformer @Since("1.6.0") (override val uid: String) extends Transfor
val dummyDF = sqlContext.createDataFrame(dummyRDD, schema)
val tableName = Identifiable.randomUID(uid)
val realStatement = $(statement).replace(tableIdentifier, tableName)
dummyDF.registerTempTable(tableName)
dummyDF.createOrReplaceTempView(tableName)
val outputSchema = sqlContext.sql(realStatement).schema
sqlContext.dropTempTable(tableName)
outputSchema
Expand Down
26 changes: 6 additions & 20 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,34 +166,20 @@ def createExternalTable(self, tableName, path=None, source=None, schema=None, **
return DataFrame(df, self._sparkSession._wrapped)

@since(2.0)
def dropTempTable(self, tableName):
"""Drops the temporary table with the given table name in the catalog.
If the table has been cached before, then it will also be uncached.
def dropTempView(self, viewName):
"""Drops the temporary view with the given view name in the catalog.
If the view has been cached before, then it will also be uncached.

>>> spark.createDataFrame([(1, 1)]).registerTempTable("my_table")
>>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
>>> spark.table("my_table").collect()
[Row(_1=1, _2=1)]
>>> spark.catalog.dropTempTable("my_table")
>>> spark.catalog.dropTempView("my_table")
>>> spark.table("my_table") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
AnalysisException: ...
"""
self._jcatalog.dropTempTable(tableName)

@since(2.0)
def registerTable(self, df, tableName):
"""Registers the given :class:`DataFrame` as a temporary table in the catalog.

>>> df = spark.createDataFrame([(2, 1), (3, 1)])
>>> spark.catalog.registerTable(df, "my_cool_table")
>>> spark.table("my_cool_table").collect()
[Row(_1=2, _2=1), Row(_1=3, _2=1)]
"""
if isinstance(df, DataFrame):
self._jsparkSession.registerTable(df._jdf, tableName)
else:
raise ValueError("Can only register DataFrame as table")
self._jcatalog.dropTempView(viewName)

@ignore_unicode_prefix
@since(2.0)
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def registerDataFrameAsTable(self, df, tableName):

>>> sqlContext.registerDataFrameAsTable(df, "table1")
"""
self.sparkSession.catalog.registerTable(df, tableName)
df.createOrReplaceTempView(tableName)

@since(1.6)
def dropTempTable(self, tableName):
Expand All @@ -311,7 +311,7 @@ def dropTempTable(self, tableName):
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> sqlContext.dropTempTable("table1")
"""
self.sparkSession.catalog.dropTempTable(tableName)
self.sparkSession.catalog.dropTempView(tableName)

@since(1.3)
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
Expand Down
51 changes: 48 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,55 @@ def registerTempTable(self, name):
that was used to create this :class:`DataFrame`.

>>> df.registerTempTable("people")
>>> df2 = sqlContext.sql("select * from people")
>>> df2 = spark.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> spark.catalog.dropTempView("people")

.. note:: Deprecated in 2.0, use createOrReplaceTempView instead.
"""
self._jdf.createOrReplaceTempView(name)

@since(2.0)
def createTempView(self, name):
"""Creates a temporary view with this DataFrame.

The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.
throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the
catalog.

>>> df.createTempView("people")
>>> df2 = spark.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> df.createTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
Py4JJavaError: ...
: org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException...
>>> spark.catalog.dropTempView("people")

"""
self._jdf.createTempView(name)

@since(2.0)
def createOrReplaceTempView(self, name):
"""Creates or replaces a temporary view with this DataFrame.

The lifetime of this temporary table is tied to the :class:`SparkSession`
that was used to create this :class:`DataFrame`.

>>> df.createOrReplaceTempView("people")
>>> df2 = df.filter(df.age > 3)
>>> df2.createOrReplaceTempView("people")
>>> df3 = spark.sql("select * from people")
>>> sorted(df3.collect()) == sorted(df2.collect())
True
>>> spark.catalog.dropTempView("people")

"""
self._jdf.registerTempTable(name)
self._jdf.createOrReplaceTempView(name)

@property
@since(1.4)
Expand Down Expand Up @@ -1479,12 +1523,13 @@ def sampleBy(self, col, fractions, seed=None):
def _test():
import doctest
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
from pyspark.sql import Row, SQLContext, SparkSession
import pyspark.sql.dataframe
globs = pyspark.sql.dataframe.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlContext'] = SQLContext(sc)
globs['spark'] = SparkSession(sc)
globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\
.toDF(StructType([StructField('age', IntegerType()),
StructField('name', StringType())]))
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def __init__(self, sparkContext, jsparkSession=None):
... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
... time=datetime(2014, 8, 1, 14, 1, 5))])
>>> df = allTypes.toDF()
>>> df.registerTempTable("allTypes")
>>> df.createOrReplaceTempView("allTypes")
>>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
... 'from allTypes where b and i > 0').collect()
[Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, \
Expand Down Expand Up @@ -484,7 +484,7 @@ def sql(self, sqlQuery):

:return: :class:`DataFrame`

>>> spark.catalog.registerTable(df, "table1")
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
Expand All @@ -497,7 +497,7 @@ def table(self, tableName):

:return: :class:`DataFrame`

>>> spark.catalog.registerTable(df, "table1")
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.types.IntegerType
* ("a", "ca1", "cb2", 5),
* ("b", "ca1", "cb1", 13))
* .toDF("key", "cat1", "cat2", "value")
* data.registerTempTable("data")
* data.createOrReplaceTempView("data")
*
* val agg = data.groupBy($"key")
* .agg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class SessionCatalog(
/**
* Create a temporary table.
*/
def createTempTable(
def createTempView(
name: String,
tableDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait AnalysisTest extends PlanTest {
private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
val conf = new SimpleCatalystConf(caseSensitive)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
catalog.createTempTable("TaBlE", TestRelations.testRelation, overrideIfExists = true)
catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
new Analyzer(catalog, conf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
private val b: Expression = UnresolvedAttribute("b")

before {
catalog.createTempTable("table", relation, overrideIfExists = true)
catalog.createTempView("table", relation, overrideIfExists = true)
}

private def checkType(expression: Expression, expectedType: DataType): Unit = {
Expand Down
Loading