Skip to content

Commit f3fb5f1

Browse files
committed
Support for JDBC Datasource InsertableRelation
1 parent 98195c3 commit f3fb5f1

File tree

2 files changed

+43
-2
lines changed

2 files changed

+43
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424

2525
import org.apache.spark.Partition
2626
import org.apache.spark.rdd.RDD
27+
import org.apache.spark.sql.DataFrame
2728
import org.apache.spark.sql.SQLContext
2829
import org.apache.spark.sql.catalyst.expressions.Row
2930
import org.apache.spark.sql.sources._
@@ -129,7 +130,8 @@ private[sql] case class JDBCRelation(
129130
parts: Array[Partition],
130131
properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
131132
extends BaseRelation
132-
with PrunedFilteredScan {
133+
with PrunedFilteredScan
134+
with InsertableRelation {
133135

134136
override val needConversion: Boolean = false
135137

@@ -148,4 +150,8 @@ private[sql] case class JDBCRelation(
148150
filters,
149151
parts)
150152
}
153+
154+
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
155+
data.insertIntoJDBC(url, table, overwrite, properties)
156+
}
151157
}

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,29 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
4343

4444
conn1 = DriverManager.getConnection(url1, properties)
4545
conn1.prepareStatement("create schema test").executeUpdate()
46+
conn1.prepareStatement("drop table if exists test.people").executeUpdate()
47+
conn1.prepareStatement(
48+
"create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
49+
conn1.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate()
50+
conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate()
51+
conn1.prepareStatement("drop table if exists test.people1").executeUpdate()
52+
conn1.prepareStatement(
53+
"create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
54+
conn1.commit()
55+
56+
TestSQLContext.sql(
57+
s"""
58+
|CREATE TEMPORARY TABLE PEOPLE
59+
|USING org.apache.spark.sql.jdbc
60+
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
61+
""".stripMargin.replaceAll("\n", " "))
62+
63+
TestSQLContext.sql(
64+
s"""
65+
|CREATE TEMPORARY TABLE PEOPLE1
66+
|USING org.apache.spark.sql.jdbc
67+
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass')
68+
""".stripMargin.replaceAll("\n", " "))
4669
}
4770

4871
after {
@@ -114,5 +137,17 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
114137
df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
115138
}
116139
}
117-
140+
141+
test("INSERT to JDBC Datasource") {
142+
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
143+
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
144+
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
145+
}
146+
147+
test("INSERT to JDBC Datasource with overwrite") {
148+
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
149+
TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
150+
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
151+
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
152+
}
118153
}

0 commit comments

Comments
 (0)