Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 283 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,289 @@ class SQLContext private[sql](val sparkSession: SparkSession)
sessionState.catalog.listTables(databaseName).map(_.table).toArray
}

////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// Deprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////

/**
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @marmbrus and @srowen .
Do you really need to keep 1.3.0 API?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; I also don't feel strongly about it as there isn't much overhead to keeping it, beyond API noise.
@gatorsmile may have some additional info or data about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rubric says nothing about how old the deprecation warning is on purpose. It says we should think about usage.

@gatorsmile do you have any reason to believe these are commonly used functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
}

/**
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
}

/**
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd, beanClass)
}

/**
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
*/
@deprecated("Use createDataFrame instead.", "1.3.0")
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd, beanClass)
}

/**
* Loads a Parquet file, returning the result as a `DataFrame`. This function returns an empty
* `DataFrame` if no paths are passed in.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().parquet()`.
*/
@deprecated("Use read.parquet() instead.", "1.4.0")
@scala.annotation.varargs
def parquetFile(paths: String*): DataFrame = {
if (paths.isEmpty) {
emptyDataFrame
} else {
read.parquet(paths : _*)
}
}

/**
* Loads a JSON file (one object per line), returning the result as a `DataFrame`.
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonFile(path: String): DataFrame = {
read.json(path)
}

/**
* Loads a JSON file (one object per line) and applies the given schema,
* returning the result as a `DataFrame`.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonFile(path: String, schema: StructType): DataFrame = {
read.schema(schema).json(path)
}

/**
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonFile(path: String, samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(path)
}

/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* `DataFrame`.
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonRDD(json: RDD[String]): DataFrame = read.json(json)

/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* `DataFrame`.
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)

/**
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
* returning the result as a `DataFrame`.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
read.schema(schema).json(json)
}

/**
* Loads an JavaRDD[String] storing JSON objects (one object per record) and applies the given
* schema, returning the result as a `DataFrame`.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
read.schema(schema).json(json)
}

/**
* Loads an RDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a `DataFrame`.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}

/**
* Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a `DataFrame`.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().json()`.
*/
@deprecated("Use read.json() instead.", "1.4.0")
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}

/**
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by `read().load(path)`.
*/
@deprecated("Use read.load(path) instead.", "1.4.0")
def load(path: String): DataFrame = {
read.load(path)
}

/**
* Returns the dataset stored at path as a DataFrame, using the given data source.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
*/
@deprecated("Use read.format(source).load(path) instead.", "1.4.0")
def load(path: String, source: String): DataFrame = {
read.format(source).load(path)
}

/**
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by `read().format(source).options(options).load()`.
*/
@deprecated("Use read.format(source).options(options).load() instead.", "1.4.0")
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
read.options(options).format(source).load()
}

/**
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by `read().format(source).options(options).load()`.
*/
@deprecated("Use read.format(source).options(options).load() instead.", "1.4.0")
def load(source: String, options: Map[String, String]): DataFrame = {
read.options(options).format(source).load()
}

/**
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by
* `read().format(source).schema(schema).options(options).load()`.
*/
@deprecated("Use read.format(source).schema(schema).options(options).load() instead.", "1.4.0")
def load(
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
read.format(source).schema(schema).options(options).load()
}

/**
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by
* `read().format(source).schema(schema).options(options).load()`.
*/
@deprecated("Use read.format(source).schema(schema).options(options).load() instead.", "1.4.0")
def load(source: String, schema: StructType, options: Map[String, String]): DataFrame = {
read.format(source).schema(schema).options(options).load()
}

/**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().jdbc()`.
*/
@deprecated("Use read.jdbc() instead.", "1.4.0")
def jdbc(url: String, table: String): DataFrame = {
read.jdbc(url, table, new Properties)
}

/**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
* passed to this function.
*
* @param columnName the name of a column of integral type that will be used for partitioning.
* @param lowerBound the minimum value of `columnName` used to decide partition stride
* @param upperBound the maximum value of `columnName` used to decide partition stride
* @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
* evenly into this many partitions
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().jdbc()`.
*/
@deprecated("Use read.jdbc() instead.", "1.4.0")
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int): DataFrame = {
read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties)
}

/**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table. The theParts parameter gives a list expressions
* suitable for inclusion in WHERE clauses; each one defines one partition
* of the `DataFrame`.
*
* @group specificdata
* @deprecated As of 1.4.0, replaced by `read().jdbc()`.
*/
@deprecated("Use read.jdbc() instead.", "1.4.0")
def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
read.jdbc(url, table, theParts, new Properties)
}
}

/**
Expand Down
106 changes: 106 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DeprecatedAPISuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

class DeprecatedAPISuite extends QueryTest with SharedSparkSession {

test("SQLContext.applySchema") {
val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry", 18)))
val schema = StructType(StructField("name", StringType, false) ::
StructField("age", IntegerType, true) :: Nil)
val sqlContext = spark.sqlContext
checkAnswer(sqlContext.applySchema(rowRdd, schema), Row("Jack", 20) :: Row("Marry", 18) :: Nil)
checkAnswer(sqlContext.applySchema(rowRdd.toJavaRDD(), schema),
Row("Jack", 20) :: Row("Marry", 18) :: Nil)
}

test("SQLContext.parquetFile") {
val sqlContext = spark.sqlContext
withTempDir { dir =>
val parquetFile = s"${dir.toString}/${System.currentTimeMillis()}"
val expectDF = spark.range(10).toDF()
expectDF.write.parquet(parquetFile)
val parquetDF = sqlContext.parquetFile(parquetFile)
checkAnswer(parquetDF, expectDF)
}
}

test("SQLContext.jsonFile") {
val sqlContext = spark.sqlContext
withTempDir { dir =>
val jsonFile = s"${dir.toString}/${System.currentTimeMillis()}"
val expectDF = spark.range(10).toDF()
expectDF.write.json(jsonFile)
var jsonDF = sqlContext.jsonFile(jsonFile)
checkAnswer(jsonDF, expectDF)
assert(jsonDF.schema === expectDF.schema.asNullable)

var schema = expectDF.schema
jsonDF = sqlContext.jsonFile(jsonFile, schema)
checkAnswer(jsonDF, expectDF)
assert(jsonDF.schema === schema.asNullable)

jsonDF = sqlContext.jsonFile(jsonFile, 0.9)
checkAnswer(jsonDF, expectDF)

val jsonRDD = sparkContext.parallelize(Seq("{\"name\":\"Jack\",\"age\":20}",
"{\"name\":\"Marry\",\"age\":18}"))
jsonDF = sqlContext.jsonRDD(jsonRDD)
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD())
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)

schema = StructType(StructField("name", StringType, false) ::
StructField("age", IntegerType, false) :: Nil)
jsonDF = sqlContext.jsonRDD(jsonRDD, schema)
checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), schema)
checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)


jsonDF = sqlContext.jsonRDD(jsonRDD, 0.9)
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), 0.9)
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
}
}

test("SQLContext.load") {
withTempDir { dir =>
val path = s"${dir.toString}/${System.currentTimeMillis()}"
val expectDF = spark.range(10).toDF()
expectDF.write.parquet(path)
val sqlContext = spark.sqlContext

var loadDF = sqlContext.load(path)
checkAnswer(loadDF, expectDF)

loadDF = sqlContext.load(path, "parquet")
checkAnswer(loadDF, expectDF)

loadDF = sqlContext.load("parquet", Map("path" -> path))
checkAnswer(loadDF, expectDF)

loadDF = sqlContext.load("parquet", expectDF.schema, Map("path" -> path))
checkAnswer(loadDF, expectDF)
}
}
}
Loading