Skip to content

Commit 9df388d

Browse files
committed
move check into PreWriteCheck
1 parent 847ab20 commit 9df388d

File tree

3 files changed

+25
-20
lines changed

3 files changed

+25
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
112112
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
113113
execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
114114

115-
case logical.InsertIntoTable(t, _, _, _, _) =>
116-
throw new QueryPlanningException(s"""
117-
| Attempt to insert into a RDD-based table: ${t.simpleString}, which is immutable.
118-
| Save the RDD to a data source and register the data source as a table before insertion.
119-
""".stripMargin)
120-
121115
case _ => Nil
122116
}
123117

@@ -399,5 +393,3 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
399393
filters.flatMap(translate)
400394
}
401395
}
402-
403-
class QueryPlanningException(message: String) extends Exception(message)

sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.sources
1919

20+
import org.apache.spark.sql.execution.LogicalRDD
2021
import org.apache.spark.sql.{SaveMode, AnalysisException}
2122
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
2223
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
@@ -119,6 +120,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
119120
// The relation in l is not an InsertableRelation.
120121
failAnalysis(s"$l does not allow insertion.")
121122

123+
case logical.InsertIntoTable(t, _, _, _, _) =>
124+
failAnalysis(
125+
s"Attempt to insert into a RDD-based table: ${t.simpleString} which is immutable.")
126+
122127
case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
123128
// When the SaveMode is Overwrite, we need to check if the table is an input table of
124129
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.apache.spark.sql.sources.QueryPlanningException
21-
2220
import scala.language.postfixOps
2321

2422
import org.apache.spark.sql.functions._
2523
import org.apache.spark.sql.types._
2624
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint}
2725

28-
2926
class DataFrameSuite extends QueryTest {
3027
import org.apache.spark.sql.TestData._
3128

@@ -764,24 +761,35 @@ class DataFrameSuite extends QueryTest {
764761
assert(!f.getMessage.contains("column2"))
765762
}
766763

767-
test("SPARK-6941: Better Error Message for inserting into RDD-based Table") {
764+
test("SPARK-6941: Better error message for inserting into RDD-based Table") {
768765
val df = Seq(Tuple1(1)).toDF("col")
769-
df.registerTempTable("rdd_base")
766+
val insertion = Seq(Tuple1(2)).toDF("col")
770767

771-
df.write.parquet("tmp_parquet")
768+
// pass case: parquet table (HadoopFsRelation)
769+
df.write.mode(SaveMode.Overwrite).parquet("tmp_parquet")
772770
val pdf = ctx.read.parquet("tmp_parquet")
773771
pdf.registerTempTable("parquet_base")
772+
insertion.write.insertInto("parquet_base")
774773

775-
df.write.json("tmp_json")
774+
// pass case: json table (InsertableRelation)
775+
df.write.mode(SaveMode.Overwrite).json("tmp_json")
776776
val jdf = ctx.read.json("tmp_json")
777777
jdf.registerTempTable("json_base")
778+
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
778779

779-
val insertion = Seq(Tuple1(2)).toDF("col")
780-
val e = intercept[QueryPlanningException] {
780+
// error cases: insert into a RDD
781+
df.registerTempTable("rdd_base")
782+
val e1 = intercept[AnalysisException] {
781783
insertion.write.insertInto("rdd_base")
782784
}
783-
assert(e.getMessage.contains("Attempt to insert into a RDD-based table"))
784-
insertion.write.insertInto("parquet_base")
785-
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
785+
assert(e1.getMessage.contains("Attempt to insert into a RDD-based table"))
786+
787+
// error case: insert into a RDD based on data source
788+
val indirectDS = pdf.select("col").filter($"col" > 5)
789+
indirectDS.registerTempTable("indirect_ds")
790+
val e2 = intercept[AnalysisException] {
791+
insertion.write.insertInto("indirect_ds")
792+
}
793+
assert(e2.getMessage.contains("Attempt to insert into a RDD-based table"))
786794
}
787795
}

0 commit comments

Comments
 (0)