Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -129,7 +130,8 @@ private[sql] case class JDBCRelation(
parts: Array[Partition],
properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
extends BaseRelation
with PrunedFilteredScan {
with PrunedFilteredScan
with InsertableRelation {

override val needConversion: Boolean = false

Expand All @@ -148,4 +150,8 @@ private[sql] case class JDBCRelation(
filters,
parts)
}

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.insertIntoJDBC(url, table, overwrite, properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,29 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {

conn1 = DriverManager.getConnection(url1, properties)
conn1.prepareStatement("create schema test").executeUpdate()
conn1.prepareStatement("drop table if exists test.people").executeUpdate()
conn1.prepareStatement(
"create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
conn1.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate()
conn1.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate()
conn1.prepareStatement("drop table if exists test.people1").executeUpdate()
conn1.prepareStatement(
"create table test.people1 (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
conn1.commit()

TestSQLContext.sql(
s"""
|CREATE TEMPORARY TABLE PEOPLE
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))

TestSQLContext.sql(
s"""
|CREATE TEMPORARY TABLE PEOPLE1
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))
}

after {
Expand Down Expand Up @@ -114,5 +137,17 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
}
}


test("INSERT to JDBC Datasource") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}

test("INSERT to JDBC Datasource with overwrite") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
}