Skip to content

Commit 43dac2c

Browse files
yjshenyhuai
authored andcommitted
[SPARK-6941] [SQL] Provide a better error message to when inserting into RDD based table
JIRA: https://issues.apache.org/jira/browse/SPARK-6941 Author: Yijie Shen <[email protected]> Closes apache#7342 from yijieshen/SPARK-6941 and squashes the following commits: f82cbe7 [Yijie Shen] reorder import dd67e40 [Yijie Shen] resolve comments 09518af [Yijie Shen] fix import order in DataframeSuite 0c635d4 [Yijie Shen] make match more specific 9df388d [Yijie Shen] move check into PreWriteCheck 847ab20 [Yijie Shen] Detect insertion error in DataSourceStrategy
1 parent b536d5d commit 43dac2c

File tree

2 files changed

+60
-4
lines changed

2 files changed

+60
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.{SaveMode, AnalysisException}
2121
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
2222
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
2323
import org.apache.spark.sql.catalyst.plans.logical
24-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
24+
import org.apache.spark.sql.catalyst.plans.logical._
2525
import org.apache.spark.sql.catalyst.rules.Rule
2626
import org.apache.spark.sql.types.DataType
2727

@@ -119,6 +119,13 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
119119
// The relation in l is not an InsertableRelation.
120120
failAnalysis(s"$l does not allow insertion.")
121121

122+
case logical.InsertIntoTable(t, _, _, _, _) =>
123+
if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
124+
failAnalysis(s"Inserting into an RDD-based table is not allowed.")
125+
} else {
126+
// OK
127+
}
128+
122129
case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
123130
// When the SaveMode is Overwrite, we need to check if the table is an input table of
124131
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,23 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.io.File
21+
2022
import scala.language.postfixOps
2123

24+
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
2225
import org.apache.spark.sql.functions._
2326
import org.apache.spark.sql.types._
24-
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint}
25-
27+
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
2628

27-
class DataFrameSuite extends QueryTest {
29+
class DataFrameSuite extends QueryTest with SQLTestUtils {
2830
import org.apache.spark.sql.TestData._
2931

3032
lazy val ctx = org.apache.spark.sql.test.TestSQLContext
3133
import ctx.implicits._
3234

35+
def sqlContext: SQLContext = ctx
36+
3337
test("analysis error should be eagerly reported") {
3438
val oldSetting = ctx.conf.dataFrameEagerAnalysis
3539
// Eager analysis.
@@ -761,4 +765,49 @@ class DataFrameSuite extends QueryTest {
761765
assert(f.getMessage.contains("column3"))
762766
assert(!f.getMessage.contains("column2"))
763767
}
768+
769+
test("SPARK-6941: Better error message for inserting into RDD-based Table") {
770+
withTempDir { dir =>
771+
772+
val tempParquetFile = new File(dir, "tmp_parquet")
773+
val tempJsonFile = new File(dir, "tmp_json")
774+
775+
val df = Seq(Tuple1(1)).toDF()
776+
val insertion = Seq(Tuple1(2)).toDF("col")
777+
778+
// pass case: parquet table (HadoopFsRelation)
779+
df.write.mode(SaveMode.Overwrite).parquet(tempParquetFile.getCanonicalPath)
780+
val pdf = ctx.read.parquet(tempParquetFile.getCanonicalPath)
781+
pdf.registerTempTable("parquet_base")
782+
insertion.write.insertInto("parquet_base")
783+
784+
// pass case: json table (InsertableRelation)
785+
df.write.mode(SaveMode.Overwrite).json(tempJsonFile.getCanonicalPath)
786+
val jdf = ctx.read.json(tempJsonFile.getCanonicalPath)
787+
jdf.registerTempTable("json_base")
788+
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
789+
790+
// error cases: insert into an RDD
791+
df.registerTempTable("rdd_base")
792+
val e1 = intercept[AnalysisException] {
793+
insertion.write.insertInto("rdd_base")
794+
}
795+
assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed."))
796+
797+
// error case: insert into a logical plan that is not a LeafNode
798+
val indirectDS = pdf.select("_1").filter($"_1" > 5)
799+
indirectDS.registerTempTable("indirect_ds")
800+
val e2 = intercept[AnalysisException] {
801+
insertion.write.insertInto("indirect_ds")
802+
}
803+
assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
804+
805+
// error case: insert into an OneRowRelation
806+
new DataFrame(ctx, OneRowRelation).registerTempTable("one_row")
807+
val e3 = intercept[AnalysisException] {
808+
insertion.write.insertInto("one_row")
809+
}
810+
assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed."))
811+
}
812+
}
764813
}

0 commit comments

Comments
 (0)