Skip to content

Commit 27d97e5

Browse files
committed
address comments
1 parent 4e06830 commit 27d97e5

File tree

4 files changed

+29
-25
lines changed

4 files changed

+29
-25
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.InterfaceStability
2828
import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
3030
import org.apache.spark.sql.execution.LogicalRDD
31+
import org.apache.spark.sql.execution.command.DDLUtils
3132
import org.apache.spark.sql.execution.datasources.DataSource
3233
import org.apache.spark.sql.execution.datasources.jdbc._
3334
import org.apache.spark.sql.execution.datasources.json.InferSchema
@@ -143,6 +144,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
143144
*/
144145
@scala.annotation.varargs
145146
def load(paths: String*): DataFrame = {
147+
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
148+
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
149+
"read files of Hive data source directly.")
150+
}
151+
146152
sparkSession.baseRelationToDataFrame(
147153
DataSource.apply(
148154
sparkSession,
@@ -160,7 +166,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
160166
*/
161167
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
162168
// properties should override settings in extraOptions.
163-
this.extraOptions = this.extraOptions ++ properties.asScala
169+
this.extraOptions ++= properties.asScala
164170
// explicit url and dbtable should override all
165171
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
166172
format("jdbc").load()
@@ -469,9 +475,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
469475
* @since 1.4.0
470476
*/
471477
def table(tableName: String): DataFrame = {
472-
Dataset.ofRows(sparkSession,
473-
sparkSession.sessionState.catalog.lookupRelation(
474-
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)))
478+
sparkSession.table(tableName)
475479
}
476480

477481
/**
@@ -550,6 +554,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
550554

551555
private var userSpecifiedSchema: Option[StructType] = None
552556

553-
private var extraOptions = new scala.collection.mutable.HashMap[String, String]
557+
private val extraOptions = new scala.collection.mutable.HashMap[String, String]
554558

555559
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2626
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
2727
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
2828
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
29+
import org.apache.spark.sql.execution.command.DDLUtils
2930
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
3031
import org.apache.spark.sql.sources.BaseRelation
3132
import org.apache.spark.sql.types.StructType
@@ -204,6 +205,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
204205
* @since 1.4.0
205206
*/
206207
def save(): Unit = {
208+
if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
209+
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
210+
"write files of Hive data source directly.")
211+
}
212+
207213
assertNotBucketed("save")
208214
val dataSource = DataSource(
209215
df.sparkSession,

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
11691169
}
11701170
}
11711171

1172-
test("save API - format hive") {
1173-
withTempDir { dir =>
1174-
val path = dir.getCanonicalPath
1175-
val e = intercept[ClassNotFoundException] {
1176-
spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path)
1177-
}.getMessage
1178-
assert(e.contains("Failed to find data source: hive"))
1179-
}
1180-
}
1181-
1182-
test("saveAsTable API - format hive") {
1183-
val tableName = "tab1"
1184-
withTable(tableName) {
1185-
val e = intercept[AnalysisException] {
1186-
spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName)
1187-
}.getMessage
1188-
assert(e.contains("Cannot create hive serde table with saveAsTable API"))
1189-
}
1190-
}
1191-
11921172
test("create a temp view using hive") {
11931173
val tableName = "tab1"
11941174
withTable (tableName) {

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,4 +1352,18 @@ class HiveDDLSuite
13521352
"CTAS for hive serde tables does not support append or overwrite semantics"))
13531353
}
13541354
}
1355+
1356+
test("read/write files with hive data source is not allowed") {
1357+
withTempDir { dir =>
1358+
val e = intercept[AnalysisException] {
1359+
spark.read.format("hive").load(dir.getAbsolutePath)
1360+
}
1361+
assert(e.message.contains("Hive data source can only be used with tables"))
1362+
1363+
val e2 = intercept[AnalysisException] {
1364+
Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
1365+
}
1366+
assert(e2.message.contains("Hive data source can only be used with tables"))
1367+
}
1368+
}
13551369
}

0 commit comments

Comments
 (0)