Skip to content

Commit ebc358c

Browse files
gatorsmileNgone51
andcommitted
[SPARK-31086][SQL] Add Back the Deprecated SQLContext methods
### What changes were proposed in this pull request? Based on the discussion in the mailing list [[Proposal] Modification to Spark's Semantic Versioning Policy](http://apache-spark-developers-list.1001551.n3.nabble.com/Proposal-Modification-to-Spark-s-Semantic-Versioning-Policy-td28938.html) , this PR is to add back the following APIs whose maintenance cost are relatively small. - SQLContext.applySchema - SQLContext.parquetFile - SQLContext.jsonFile - SQLContext.jsonRDD - SQLContext.load - SQLContext.jdbc ### Why are the changes needed? Avoid breaking the APIs that are commonly used. ### Does this PR introduce any user-facing change? Adding back the APIs that were removed in 3.0 branch does not introduce the user-facing changes, because Spark 3.0 has not been released. ### How was this patch tested? The existing tests. Closes #27839 from gatorsmile/addAPIBackV3. Lead-authored-by: gatorsmile <[email protected]> Co-authored-by: yi.wu <[email protected]> Signed-off-by: gatorsmile <[email protected]> (cherry picked from commit b7e4cc7) Signed-off-by: gatorsmile <[email protected]>
1 parent b300ed8 commit ebc358c

File tree

3 files changed

+403
-0
lines changed

3 files changed

+403
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,289 @@ class SQLContext private[sql](val sparkSession: SparkSession)
611611
sessionState.catalog.listTables(databaseName).map(_.table).toArray
612612
}
613613

614+
////////////////////////////////////////////////////////////////////////////
615+
////////////////////////////////////////////////////////////////////////////
616+
// Deprecated methods
617+
////////////////////////////////////////////////////////////////////////////
618+
////////////////////////////////////////////////////////////////////////////
619+
620+
/**
621+
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
622+
*/
623+
@deprecated("Use createDataFrame instead.", "1.3.0")
624+
def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
625+
createDataFrame(rowRDD, schema)
626+
}
627+
628+
/**
629+
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
630+
*/
631+
@deprecated("Use createDataFrame instead.", "1.3.0")
632+
def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
633+
createDataFrame(rowRDD, schema)
634+
}
635+
636+
/**
637+
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
638+
*/
639+
@deprecated("Use createDataFrame instead.", "1.3.0")
640+
def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
641+
createDataFrame(rdd, beanClass)
642+
}
643+
644+
/**
645+
* @deprecated As of 1.3.0, replaced by `createDataFrame()`.
646+
*/
647+
@deprecated("Use createDataFrame instead.", "1.3.0")
648+
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
649+
createDataFrame(rdd, beanClass)
650+
}
651+
652+
/**
653+
* Loads a Parquet file, returning the result as a `DataFrame`. This function returns an empty
654+
* `DataFrame` if no paths are passed in.
655+
*
656+
* @group specificdata
657+
* @deprecated As of 1.4.0, replaced by `read().parquet()`.
658+
*/
659+
@deprecated("Use read.parquet() instead.", "1.4.0")
660+
@scala.annotation.varargs
661+
def parquetFile(paths: String*): DataFrame = {
662+
if (paths.isEmpty) {
663+
emptyDataFrame
664+
} else {
665+
read.parquet(paths : _*)
666+
}
667+
}
668+
669+
/**
670+
* Loads a JSON file (one object per line), returning the result as a `DataFrame`.
671+
* It goes through the entire dataset once to determine the schema.
672+
*
673+
* @group specificdata
674+
* @deprecated As of 1.4.0, replaced by `read().json()`.
675+
*/
676+
@deprecated("Use read.json() instead.", "1.4.0")
677+
def jsonFile(path: String): DataFrame = {
678+
read.json(path)
679+
}
680+
681+
/**
682+
* Loads a JSON file (one object per line) and applies the given schema,
683+
* returning the result as a `DataFrame`.
684+
*
685+
* @group specificdata
686+
* @deprecated As of 1.4.0, replaced by `read().json()`.
687+
*/
688+
@deprecated("Use read.json() instead.", "1.4.0")
689+
def jsonFile(path: String, schema: StructType): DataFrame = {
690+
read.schema(schema).json(path)
691+
}
692+
693+
/**
694+
* @group specificdata
695+
* @deprecated As of 1.4.0, replaced by `read().json()`.
696+
*/
697+
@deprecated("Use read.json() instead.", "1.4.0")
698+
def jsonFile(path: String, samplingRatio: Double): DataFrame = {
699+
read.option("samplingRatio", samplingRatio.toString).json(path)
700+
}
701+
702+
/**
703+
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
704+
* `DataFrame`.
705+
* It goes through the entire dataset once to determine the schema.
706+
*
707+
* @group specificdata
708+
* @deprecated As of 1.4.0, replaced by `read().json()`.
709+
*/
710+
@deprecated("Use read.json() instead.", "1.4.0")
711+
def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
712+
713+
/**
714+
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
715+
* `DataFrame`.
716+
* It goes through the entire dataset once to determine the schema.
717+
*
718+
* @group specificdata
719+
* @deprecated As of 1.4.0, replaced by `read().json()`.
720+
*/
721+
@deprecated("Use read.json() instead.", "1.4.0")
722+
def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
723+
724+
/**
725+
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
726+
* returning the result as a `DataFrame`.
727+
*
728+
* @group specificdata
729+
* @deprecated As of 1.4.0, replaced by `read().json()`.
730+
*/
731+
@deprecated("Use read.json() instead.", "1.4.0")
732+
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
733+
read.schema(schema).json(json)
734+
}
735+
736+
/**
737+
* Loads an JavaRDD[String] storing JSON objects (one object per record) and applies the given
738+
* schema, returning the result as a `DataFrame`.
739+
*
740+
* @group specificdata
741+
* @deprecated As of 1.4.0, replaced by `read().json()`.
742+
*/
743+
@deprecated("Use read.json() instead.", "1.4.0")
744+
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
745+
read.schema(schema).json(json)
746+
}
747+
748+
/**
749+
* Loads an RDD[String] storing JSON objects (one object per record) inferring the
750+
* schema, returning the result as a `DataFrame`.
751+
*
752+
* @group specificdata
753+
* @deprecated As of 1.4.0, replaced by `read().json()`.
754+
*/
755+
@deprecated("Use read.json() instead.", "1.4.0")
756+
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
757+
read.option("samplingRatio", samplingRatio.toString).json(json)
758+
}
759+
760+
/**
761+
* Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
762+
* schema, returning the result as a `DataFrame`.
763+
*
764+
* @group specificdata
765+
* @deprecated As of 1.4.0, replaced by `read().json()`.
766+
*/
767+
@deprecated("Use read.json() instead.", "1.4.0")
768+
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
769+
read.option("samplingRatio", samplingRatio.toString).json(json)
770+
}
771+
772+
/**
773+
* Returns the dataset stored at path as a DataFrame,
774+
* using the default data source configured by spark.sql.sources.default.
775+
*
776+
* @group genericdata
777+
* @deprecated As of 1.4.0, replaced by `read().load(path)`.
778+
*/
779+
@deprecated("Use read.load(path) instead.", "1.4.0")
780+
def load(path: String): DataFrame = {
781+
read.load(path)
782+
}
783+
784+
/**
785+
* Returns the dataset stored at path as a DataFrame, using the given data source.
786+
*
787+
* @group genericdata
788+
* @deprecated As of 1.4.0, replaced by `read().format(source).load(path)`.
789+
*/
790+
@deprecated("Use read.format(source).load(path) instead.", "1.4.0")
791+
def load(path: String, source: String): DataFrame = {
792+
read.format(source).load(path)
793+
}
794+
795+
/**
796+
* (Java-specific) Returns the dataset specified by the given data source and
797+
* a set of options as a DataFrame.
798+
*
799+
* @group genericdata
800+
* @deprecated As of 1.4.0, replaced by `read().format(source).options(options).load()`.
801+
*/
802+
@deprecated("Use read.format(source).options(options).load() instead.", "1.4.0")
803+
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
804+
read.options(options).format(source).load()
805+
}
806+
807+
/**
808+
* (Scala-specific) Returns the dataset specified by the given data source and
809+
* a set of options as a DataFrame.
810+
*
811+
* @group genericdata
812+
* @deprecated As of 1.4.0, replaced by `read().format(source).options(options).load()`.
813+
*/
814+
@deprecated("Use read.format(source).options(options).load() instead.", "1.4.0")
815+
def load(source: String, options: Map[String, String]): DataFrame = {
816+
read.options(options).format(source).load()
817+
}
818+
819+
/**
820+
* (Java-specific) Returns the dataset specified by the given data source and
821+
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
822+
*
823+
* @group genericdata
824+
* @deprecated As of 1.4.0, replaced by
825+
* `read().format(source).schema(schema).options(options).load()`.
826+
*/
827+
@deprecated("Use read.format(source).schema(schema).options(options).load() instead.", "1.4.0")
828+
def load(
829+
source: String,
830+
schema: StructType,
831+
options: java.util.Map[String, String]): DataFrame = {
832+
read.format(source).schema(schema).options(options).load()
833+
}
834+
835+
/**
836+
* (Scala-specific) Returns the dataset specified by the given data source and
837+
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
838+
*
839+
* @group genericdata
840+
* @deprecated As of 1.4.0, replaced by
841+
* `read().format(source).schema(schema).options(options).load()`.
842+
*/
843+
@deprecated("Use read.format(source).schema(schema).options(options).load() instead.", "1.4.0")
844+
def load(source: String, schema: StructType, options: Map[String, String]): DataFrame = {
845+
read.format(source).schema(schema).options(options).load()
846+
}
847+
848+
/**
849+
* Construct a `DataFrame` representing the database table accessible via JDBC URL
850+
* url named table.
851+
*
852+
* @group specificdata
853+
* @deprecated As of 1.4.0, replaced by `read().jdbc()`.
854+
*/
855+
@deprecated("Use read.jdbc() instead.", "1.4.0")
856+
def jdbc(url: String, table: String): DataFrame = {
857+
read.jdbc(url, table, new Properties)
858+
}
859+
860+
/**
861+
* Construct a `DataFrame` representing the database table accessible via JDBC URL
862+
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
863+
* passed to this function.
864+
*
865+
* @param columnName the name of a column of integral type that will be used for partitioning.
866+
* @param lowerBound the minimum value of `columnName` used to decide partition stride
867+
* @param upperBound the maximum value of `columnName` used to decide partition stride
868+
* @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
869+
* evenly into this many partitions
870+
* @group specificdata
871+
* @deprecated As of 1.4.0, replaced by `read().jdbc()`.
872+
*/
873+
@deprecated("Use read.jdbc() instead.", "1.4.0")
874+
def jdbc(
875+
url: String,
876+
table: String,
877+
columnName: String,
878+
lowerBound: Long,
879+
upperBound: Long,
880+
numPartitions: Int): DataFrame = {
881+
read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties)
882+
}
883+
884+
/**
885+
* Construct a `DataFrame` representing the database table accessible via JDBC URL
886+
* url named table. The theParts parameter gives a list expressions
887+
* suitable for inclusion in WHERE clauses; each one defines one partition
888+
* of the `DataFrame`.
889+
*
890+
* @group specificdata
891+
* @deprecated As of 1.4.0, replaced by `read().jdbc()`.
892+
*/
893+
@deprecated("Use read.jdbc() instead.", "1.4.0")
894+
def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
895+
read.jdbc(url, table, theParts, new Properties)
896+
}
614897
}
615898

616899
/**
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.spark.sql.test.SharedSparkSession
21+
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
22+
23+
class DeprecatedAPISuite extends QueryTest with SharedSparkSession {
24+
25+
test("SQLContext.applySchema") {
26+
val rowRdd = sparkContext.parallelize(Seq(Row("Jack", 20), Row("Marry", 18)))
27+
val schema = StructType(StructField("name", StringType, false) ::
28+
StructField("age", IntegerType, true) :: Nil)
29+
val sqlContext = spark.sqlContext
30+
checkAnswer(sqlContext.applySchema(rowRdd, schema), Row("Jack", 20) :: Row("Marry", 18) :: Nil)
31+
checkAnswer(sqlContext.applySchema(rowRdd.toJavaRDD(), schema),
32+
Row("Jack", 20) :: Row("Marry", 18) :: Nil)
33+
}
34+
35+
test("SQLContext.parquetFile") {
36+
val sqlContext = spark.sqlContext
37+
withTempDir { dir =>
38+
val parquetFile = s"${dir.toString}/${System.currentTimeMillis()}"
39+
val expectDF = spark.range(10).toDF()
40+
expectDF.write.parquet(parquetFile)
41+
val parquetDF = sqlContext.parquetFile(parquetFile)
42+
checkAnswer(parquetDF, expectDF)
43+
}
44+
}
45+
46+
test("SQLContext.jsonFile") {
47+
val sqlContext = spark.sqlContext
48+
withTempDir { dir =>
49+
val jsonFile = s"${dir.toString}/${System.currentTimeMillis()}"
50+
val expectDF = spark.range(10).toDF()
51+
expectDF.write.json(jsonFile)
52+
var jsonDF = sqlContext.jsonFile(jsonFile)
53+
checkAnswer(jsonDF, expectDF)
54+
assert(jsonDF.schema === expectDF.schema.asNullable)
55+
56+
var schema = expectDF.schema
57+
jsonDF = sqlContext.jsonFile(jsonFile, schema)
58+
checkAnswer(jsonDF, expectDF)
59+
assert(jsonDF.schema === schema.asNullable)
60+
61+
jsonDF = sqlContext.jsonFile(jsonFile, 0.9)
62+
checkAnswer(jsonDF, expectDF)
63+
64+
val jsonRDD = sparkContext.parallelize(Seq("{\"name\":\"Jack\",\"age\":20}",
65+
"{\"name\":\"Marry\",\"age\":18}"))
66+
jsonDF = sqlContext.jsonRDD(jsonRDD)
67+
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
68+
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD())
69+
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
70+
71+
schema = StructType(StructField("name", StringType, false) ::
72+
StructField("age", IntegerType, false) :: Nil)
73+
jsonDF = sqlContext.jsonRDD(jsonRDD, schema)
74+
checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
75+
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), schema)
76+
checkAnswer(jsonDF, Row("Jack", 20) :: Row("Marry", 18) :: Nil)
77+
78+
79+
jsonDF = sqlContext.jsonRDD(jsonRDD, 0.9)
80+
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
81+
jsonDF = sqlContext.jsonRDD(jsonRDD.toJavaRDD(), 0.9)
82+
checkAnswer(jsonDF, Row(18, "Marry") :: Row(20, "Jack") :: Nil)
83+
}
84+
}
85+
86+
test("SQLContext.load") {
87+
withTempDir { dir =>
88+
val path = s"${dir.toString}/${System.currentTimeMillis()}"
89+
val expectDF = spark.range(10).toDF()
90+
expectDF.write.parquet(path)
91+
val sqlContext = spark.sqlContext
92+
93+
var loadDF = sqlContext.load(path)
94+
checkAnswer(loadDF, expectDF)
95+
96+
loadDF = sqlContext.load(path, "parquet")
97+
checkAnswer(loadDF, expectDF)
98+
99+
loadDF = sqlContext.load("parquet", Map("path" -> path))
100+
checkAnswer(loadDF, expectDF)
101+
102+
loadDF = sqlContext.load("parquet", expectDF.schema, Map("path" -> path))
103+
checkAnswer(loadDF, expectDF)
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)