Skip to content

Commit 294337d

Browse files
committed
add back HiveContext and createExternalTable
1 parent ba86524 commit 294337d

File tree

10 files changed

+447
-11
lines changed

10 files changed

+447
-11
lines changed

docs/sql-migration-guide.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,6 @@ license: |
2525
## Upgrading from Spark SQL 2.4 to 3.0
2626
- Since Spark 3.0, when inserting a value into a table column with a different data type, the type coercion is performed as per ANSI SQL standard. Certain unreasonable type conversions such as converting `string` to `int` and `double` to `boolean` are disallowed. A runtime exception will be thrown if the value is out-of-range for the data type of the column. In Spark version 2.4 and earlier, type conversions during table insertion are allowed as long as they are valid `Cast`. When inserting an out-of-range value to a integral field, the low-order bits of the value is inserted(the same as Java/Scala numeric type casting). For example, if 257 is inserted to a field of byte type, the result is 1. The behavior is controlled by the option `spark.sql.storeAssignmentPolicy`, with a default value as "ANSI". Setting the option as "Legacy" restores the previous behavior.
2727

28-
- In Spark 3.0, the deprecated methods `SQLContext.createExternalTable` and `SparkSession.createExternalTable` have been removed in favor of its replacement, `createTable`.
29-
30-
- In Spark 3.0, the deprecated `HiveContext` class has been removed. Use `SparkSession.builder.enableHiveSupport()` instead.
31-
3228
- Since Spark 3.0, configuration `spark.sql.crossJoin.enabled` become internal configuration, and is true by default, so by default spark won't raise exception on sql with implicit cross join.
3329

3430
- Since Spark 3.0, we reversed argument order of the trim function from `TRIM(trimStr, str)` to `TRIM(str, trimStr)` to be compatible with other databases.

project/MimaExcludes.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ object MimaExcludes {
5252
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ExecutorPlugin"),
5353

5454
// [SPARK-28980][SQL][CORE][MLLIB] Remove more old deprecated items in Spark 3
55-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.createExternalTable"),
56-
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createExternalTable"),
5755
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.KMeans.train"),
5856
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.clustering.KMeans.train"),
5957
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.classification.LogisticRegressionWithSGD$"),

python/pyspark/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def wrapper(self, *args, **kwargs):
113113

114114

115115
# for back compatibility
116-
from pyspark.sql import SQLContext, Row
116+
from pyspark.sql import SQLContext, HiveContext, Row
117117

118118
__all__ = [
119119
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",

python/pyspark/sql/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444

4545
from pyspark.sql.types import Row
46-
from pyspark.sql.context import SQLContext, UDFRegistration
46+
from pyspark.sql.context import SQLContext, HiveContext, UDFRegistration
4747
from pyspark.sql.session import SparkSession
4848
from pyspark.sql.column import Column
4949
from pyspark.sql.catalog import Catalog
@@ -55,7 +55,7 @@
5555

5656

5757
__all__ = [
58-
'SparkSession', 'SQLContext', 'UDFRegistration',
58+
'SparkSession', 'SQLContext', 'HiveContext', 'UDFRegistration',
5959
'DataFrame', 'GroupedData', 'Column', 'Catalog', 'Row',
6060
'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
6161
'DataFrameReader', 'DataFrameWriter', 'PandasCogroupedOps'

python/pyspark/sql/catalog.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,26 @@ def listColumns(self, tableName, dbName=None):
138138
isBucket=jcolumn.isBucket()))
139139
return columns
140140

141+
@since(2.0)
142+
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
143+
"""Creates a table based on the dataset in a data source.
144+
145+
It returns the DataFrame associated with the external table.
146+
147+
The data source is specified by the ``source`` and a set of ``options``.
148+
If ``source`` is not specified, the default data source configured by
149+
``spark.sql.sources.default`` will be used.
150+
151+
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
152+
created external table.
153+
154+
:return: :class:`DataFrame`
155+
"""
156+
warnings.warn(
157+
"createExternalTable is deprecated since Spark 2.2, please use createTable instead.",
158+
DeprecationWarning)
159+
return self.createTable(tableName, path, source, schema, **options)
160+
141161
@since(2.2)
142162
def createTable(self, tableName, path=None, source=None, schema=None, **options):
143163
"""Creates a table based on the dataset in a data source.

python/pyspark/sql/context.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from pyspark.sql.udf import UDFRegistration
3333
from pyspark.sql.utils import install_exception_handler
3434

35-
__all__ = ["SQLContext"]
35+
__all__ = ["SQLContext", "HiveContext"]
3636

3737

3838
class SQLContext(object):
@@ -340,6 +340,24 @@ def dropTempTable(self, tableName):
340340
"""
341341
self.sparkSession.catalog.dropTempView(tableName)
342342

343+
@since(1.3)
344+
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
345+
"""Creates an external table based on the dataset in a data source.
346+
347+
It returns the DataFrame associated with the external table.
348+
349+
The data source is specified by the ``source`` and a set of ``options``.
350+
If ``source`` is not specified, the default data source configured by
351+
``spark.sql.sources.default`` will be used.
352+
353+
Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
354+
created external table.
355+
356+
:return: :class:`DataFrame`
357+
"""
358+
return self.sparkSession.catalog.createExternalTable(
359+
tableName, path, source, schema, **options)
360+
343361
@ignore_unicode_prefix
344362
@since(1.0)
345363
def sql(self, sqlQuery):
@@ -463,6 +481,53 @@ def streams(self):
463481
return StreamingQueryManager(self._ssql_ctx.streams())
464482

465483

484+
class HiveContext(SQLContext):
485+
"""A variant of Spark SQL that integrates with data stored in Hive.
486+
487+
Configuration for Hive is read from ``hive-site.xml`` on the classpath.
488+
It supports running both SQL and HiveQL commands.
489+
490+
:param sparkContext: The SparkContext to wrap.
491+
:param jhiveContext: An optional JVM Scala HiveContext. If set, we do not instantiate a new
492+
:class:`HiveContext` in the JVM, instead we make all calls to this object.
493+
494+
.. note:: Deprecated in 2.0.0. Use SparkSession.builder.enableHiveSupport().getOrCreate().
495+
"""
496+
497+
def __init__(self, sparkContext, jhiveContext=None):
498+
warnings.warn(
499+
"HiveContext is deprecated in Spark 2.0.0. Please use " +
500+
"SparkSession.builder.enableHiveSupport().getOrCreate() instead.",
501+
DeprecationWarning)
502+
if jhiveContext is None:
503+
sparkContext._conf.set("spark.sql.catalogImplementation", "hive")
504+
sparkSession = SparkSession.builder._sparkContext(sparkContext).getOrCreate()
505+
else:
506+
sparkSession = SparkSession(sparkContext, jhiveContext.sparkSession())
507+
SQLContext.__init__(self, sparkContext, sparkSession, jhiveContext)
508+
509+
@classmethod
510+
def _createForTesting(cls, sparkContext):
511+
"""(Internal use only) Create a new HiveContext for testing.
512+
513+
All test code that touches HiveContext *must* go through this method. Otherwise,
514+
you may end up launching multiple derby instances and encounter with incredibly
515+
confusing error messages.
516+
"""
517+
jsc = sparkContext._jsc.sc()
518+
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False)
519+
return cls(sparkContext, jtestHive)
520+
521+
def refreshTable(self, tableName):
522+
"""Invalidate and refresh all the cached the metadata of the given
523+
table. For performance reasons, Spark SQL or the external data source
524+
library it uses might cache certain metadata about a table, such as the
525+
location of blocks. When those change outside of Spark SQL, users should
526+
call this function to invalidate the cache.
527+
"""
528+
self._ssql_ctx.refreshTable(tableName)
529+
530+
466531
def _test():
467532
import os
468533
import doctest

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,97 @@ class SQLContext private[sql](val sparkSession: SparkSession)
479479
def readStream: DataStreamReader = sparkSession.readStream
480480

481481

482+
/**
483+
* Creates an external table from the given path and returns the corresponding DataFrame.
484+
* It will use the default data source configured by spark.sql.sources.default.
485+
*
486+
* @group ddl_ops
487+
* @since 1.3.0
488+
*/
489+
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
490+
def createExternalTable(tableName: String, path: String): DataFrame = {
491+
sparkSession.catalog.createTable(tableName, path)
492+
}
493+
494+
/**
495+
* Creates an external table from the given path based on a data source
496+
* and returns the corresponding DataFrame.
497+
*
498+
* @group ddl_ops
499+
* @since 1.3.0
500+
*/
501+
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
502+
def createExternalTable(
503+
tableName: String,
504+
path: String,
505+
source: String): DataFrame = {
506+
sparkSession.catalog.createTable(tableName, path, source)
507+
}
508+
509+
/**
510+
* Creates an external table from the given path based on a data source and a set of options.
511+
* Then, returns the corresponding DataFrame.
512+
*
513+
* @group ddl_ops
514+
* @since 1.3.0
515+
*/
516+
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
517+
def createExternalTable(
518+
tableName: String,
519+
source: String,
520+
options: java.util.Map[String, String]): DataFrame = {
521+
sparkSession.catalog.createTable(tableName, source, options)
522+
}
523+
524+
/**
525+
* (Scala-specific)
526+
* Creates an external table from the given path based on a data source and a set of options.
527+
* Then, returns the corresponding DataFrame.
528+
*
529+
* @group ddl_ops
530+
* @since 1.3.0
531+
*/
532+
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
533+
def createExternalTable(
534+
tableName: String,
535+
source: String,
536+
options: Map[String, String]): DataFrame = {
537+
sparkSession.catalog.createTable(tableName, source, options)
538+
}
539+
540+
/**
541+
* Create an external table from the given path based on a data source, a schema and
542+
* a set of options. Then, returns the corresponding DataFrame.
543+
*
544+
* @group ddl_ops
545+
* @since 1.3.0
546+
*/
547+
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
548+
def createExternalTable(
549+
tableName: String,
550+
source: String,
551+
schema: StructType,
552+
options: java.util.Map[String, String]): DataFrame = {
553+
sparkSession.catalog.createTable(tableName, source, schema, options)
554+
}
555+
556+
/**
557+
* (Scala-specific)
558+
* Create an external table from the given path based on a data source, a schema and
559+
* a set of options. Then, returns the corresponding DataFrame.
560+
*
561+
* @group ddl_ops
562+
* @since 1.3.0
563+
*/
564+
@deprecated("use sparkSession.catalog.createTable instead.", "2.2.0")
565+
def createExternalTable(
566+
tableName: String,
567+
source: String,
568+
schema: StructType,
569+
options: Map[String, String]): DataFrame = {
570+
sparkSession.catalog.createTable(tableName, source, schema, options)
571+
}
572+
482573
/**
483574
* Registers the given `DataFrame` as a temporary table in the catalog. Temporary tables exist
484575
* only during the lifetime of this instance of SQLContext.

0 commit comments

Comments
 (0)