Skip to content

Commit 847ab20

Browse files
committed
Detect insertion error in DataSourceStrategy
1 parent 408b384 commit 847ab20

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
112112
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
113113
execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
114114

115+
case logical.InsertIntoTable(t, _, _, _, _) =>
116+
throw new QueryPlanningException(s"""
117+
| Attempt to insert into a RDD-based table: ${t.simpleString}, which is immutable.
118+
| Save the RDD to a data source and register the data source as a table before insertion.
119+
""".stripMargin)
120+
115121
case _ => Nil
116122
}
117123

@@ -393,3 +399,5 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
393399
filters.flatMap(translate)
394400
}
395401
}
402+
403+
class QueryPlanningException(message: String) extends Exception(message)

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.sources.QueryPlanningException
21+
2022
import scala.language.postfixOps
2123

2224
import org.apache.spark.sql.functions._
@@ -761,4 +763,25 @@ class DataFrameSuite extends QueryTest {
761763
assert(f.getMessage.contains("column3"))
762764
assert(!f.getMessage.contains("column2"))
763765
}
766+
767+
test("SPARK-6941: Better Error Message for inserting into RDD-based Table") {
768+
val df = Seq(Tuple1(1)).toDF("col")
769+
df.registerTempTable("rdd_base")
770+
771+
df.write.parquet("tmp_parquet")
772+
val pdf = ctx.read.parquet("tmp_parquet")
773+
pdf.registerTempTable("parquet_base")
774+
775+
df.write.json("tmp_json")
776+
val jdf = ctx.read.json("tmp_json")
777+
jdf.registerTempTable("json_base")
778+
779+
val insertion = Seq(Tuple1(2)).toDF("col")
780+
val e = intercept[QueryPlanningException] {
781+
insertion.write.insertInto("rdd_base")
782+
}
783+
assert(e.getMessage.contains("Attempt to insert into a RDD-based table"))
784+
insertion.write.insertInto("parquet_base")
785+
insertion.write.mode(SaveMode.Overwrite).insertInto("json_base")
786+
}
764787
}

0 commit comments

Comments
 (0)