Skip to content

Commit bfd3ee9

Browse files
vlyubinmarmbrus
authored andcommitted
[SPARK-6124] Support jdbc connection properties in OPTIONS part of the query
One more thing if this PR is considered to be OK - it might make sense to add extra .jdbc() API's that take Properties to SQLContext. Author: Volodymyr Lyubinets <[email protected]> Closes apache#4859 from vlyubin/jdbcProperties and squashes the following commits: 7a8cfda [Volodymyr Lyubinets] Support jdbc connection properties in OPTIONS part of the query
1 parent 6cd7058 commit bfd3ee9

File tree

3 files changed

+59
-29
lines changed

3 files changed

+59
-29
lines changed

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.jdbc
1919

2020
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException}
21+
import java.util.Properties
2122

2223
import org.apache.commons.lang.StringEscapeUtils.escapeSql
2324
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
@@ -90,9 +91,9 @@ private[sql] object JDBCRDD extends Logging {
9091
* @throws SQLException if the table specification is garbage.
9192
* @throws SQLException if the table contains an unsupported type.
9293
*/
93-
def resolveTable(url: String, table: String): StructType = {
94+
def resolveTable(url: String, table: String, properties: Properties): StructType = {
9495
val quirks = DriverQuirks.get(url)
95-
val conn: Connection = DriverManager.getConnection(url)
96+
val conn: Connection = DriverManager.getConnection(url, properties)
9697
try {
9798
val rs = conn.prepareStatement(s"SELECT * FROM $table WHERE 1=0").executeQuery()
9899
try {
@@ -147,7 +148,7 @@ private[sql] object JDBCRDD extends Logging {
147148
*
148149
* @return A function that loads the driver and connects to the url.
149150
*/
150-
def getConnector(driver: String, url: String): () => Connection = {
151+
def getConnector(driver: String, url: String, properties: Properties): () => Connection = {
151152
() => {
152153
try {
153154
if (driver != null) Class.forName(driver)
@@ -156,7 +157,7 @@ private[sql] object JDBCRDD extends Logging {
156157
logWarning(s"Couldn't find class $driver", e);
157158
}
158159
}
159-
DriverManager.getConnection(url)
160+
DriverManager.getConnection(url, properties)
160161
}
161162
}
162163
/**
@@ -179,6 +180,7 @@ private[sql] object JDBCRDD extends Logging {
179180
schema: StructType,
180181
driver: String,
181182
url: String,
183+
properties: Properties,
182184
fqTable: String,
183185
requiredColumns: Array[String],
184186
filters: Array[Filter],
@@ -189,7 +191,7 @@ private[sql] object JDBCRDD extends Logging {
189191
return new
190192
JDBCRDD(
191193
sc,
192-
getConnector(driver, url),
194+
getConnector(driver, url, properties),
193195
prunedSchema,
194196
fqTable,
195197
requiredColumns,
@@ -361,7 +363,7 @@ private[sql] class JDBCRDD(
361363
var ans = 0L
362364
var j = 0
363365
while (j < bytes.size) {
364-
ans = 256*ans + (255 & bytes(j))
366+
ans = 256 * ans + (255 & bytes(j))
365367
j = j + 1;
366368
}
367369
mutableRow.setLong(i, ans)

sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@
1717

1818
package org.apache.spark.sql.jdbc
1919

20-
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.sql.catalyst.expressions.Row
22-
import org.apache.spark.sql.types.StructType
20+
import java.sql.DriverManager
21+
import java.util.Properties
2322

2423
import scala.collection.mutable.ArrayBuffer
25-
import java.sql.DriverManager
2624

2725
import org.apache.spark.Partition
26+
import org.apache.spark.rdd.RDD
2827
import org.apache.spark.sql.SQLContext
28+
import org.apache.spark.sql.catalyst.expressions.Row
2929
import org.apache.spark.sql.sources._
30+
import org.apache.spark.sql.types.StructType
3031

3132
/**
3233
* Data corresponding to one partition of a JDBCRDD.
@@ -115,18 +116,21 @@ private[sql] class DefaultSource extends RelationProvider {
115116
numPartitions.toInt)
116117
}
117118
val parts = JDBCRelation.columnPartition(partitionInfo)
118-
JDBCRelation(url, table, parts)(sqlContext)
119+
val properties = new Properties() // Additional properties that we will pass to getConnection
120+
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
121+
JDBCRelation(url, table, parts, properties)(sqlContext)
119122
}
120123
}
121124

122125
private[sql] case class JDBCRelation(
123126
url: String,
124127
table: String,
125-
parts: Array[Partition])(@transient val sqlContext: SQLContext)
128+
parts: Array[Partition],
129+
properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
126130
extends BaseRelation
127131
with PrunedFilteredScan {
128132

129-
override val schema: StructType = JDBCRDD.resolveTable(url, table)
133+
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)
130134

131135
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
132136
val driver: String = DriverManager.getDriver(url).getClass.getCanonicalName
@@ -135,6 +139,7 @@ private[sql] case class JDBCRelation(
135139
schema,
136140
driver,
137141
url,
142+
properties,
138143
table,
139144
requiredColumns,
140145
filters,

sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,31 @@ package org.apache.spark.sql.jdbc
1919

2020
import java.math.BigDecimal
2121
import java.sql.DriverManager
22-
import java.util.{Calendar, GregorianCalendar}
22+
import java.util.{Calendar, GregorianCalendar, Properties}
2323

2424
import org.apache.spark.sql.test._
25+
import org.h2.jdbc.JdbcSQLException
2526
import org.scalatest.{FunSuite, BeforeAndAfter}
2627
import TestSQLContext._
2728
import TestSQLContext.implicits._
2829

2930
class JDBCSuite extends FunSuite with BeforeAndAfter {
3031
val url = "jdbc:h2:mem:testdb0"
32+
val urlWithUserAndPass = "jdbc:h2:mem:testdb0;user=testUser;password=testPass"
3133
var conn: java.sql.Connection = null
3234

3335
val testBytes = Array[Byte](99.toByte, 134.toByte, 135.toByte, 200.toByte, 205.toByte)
3436

3537
before {
3638
Class.forName("org.h2.Driver")
37-
conn = DriverManager.getConnection(url)
39+
// Extra properties that will be specified for our database. We need these to test
40+
// usage of parameters from OPTIONS clause in queries.
41+
val properties = new Properties()
42+
properties.setProperty("user", "testUser")
43+
properties.setProperty("password", "testPass")
44+
properties.setProperty("rowId", "false")
45+
46+
conn = DriverManager.getConnection(url, properties)
3847
conn.prepareStatement("create schema test").executeUpdate()
3948
conn.prepareStatement("create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate()
4049
conn.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate()
@@ -46,15 +55,15 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
4655
s"""
4756
|CREATE TEMPORARY TABLE foobar
4857
|USING org.apache.spark.sql.jdbc
49-
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE')
58+
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass')
5059
""".stripMargin.replaceAll("\n", " "))
5160

5261
sql(
5362
s"""
5463
|CREATE TEMPORARY TABLE parts
5564
|USING org.apache.spark.sql.jdbc
56-
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE',
57-
|partitionColumn 'THEID', lowerBound '1', upperBound '4', numPartitions '3')
65+
|OPTIONS (url '$url', dbtable 'TEST.PEOPLE', user 'testUser', password 'testPass',
66+
| partitionColumn 'THEID', lowerBound '1', upperBound '4', numPartitions '3')
5867
""".stripMargin.replaceAll("\n", " "))
5968

6069
conn.prepareStatement("create table test.inttypes (a INT, b BOOLEAN, c TINYINT, "
@@ -68,12 +77,12 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
6877
s"""
6978
|CREATE TEMPORARY TABLE inttypes
7079
|USING org.apache.spark.sql.jdbc
71-
|OPTIONS (url '$url', dbtable 'TEST.INTTYPES')
80+
|OPTIONS (url '$url', dbtable 'TEST.INTTYPES', user 'testUser', password 'testPass')
7281
""".stripMargin.replaceAll("\n", " "))
7382

7483
conn.prepareStatement("create table test.strtypes (a BINARY(20), b VARCHAR(20), "
7584
+ "c VARCHAR_IGNORECASE(20), d CHAR(20), e BLOB, f CLOB)").executeUpdate()
76-
var stmt = conn.prepareStatement("insert into test.strtypes values (?, ?, ?, ?, ?, ?)")
85+
val stmt = conn.prepareStatement("insert into test.strtypes values (?, ?, ?, ?, ?, ?)")
7786
stmt.setBytes(1, testBytes)
7887
stmt.setString(2, "Sensitive")
7988
stmt.setString(3, "Insensitive")
@@ -85,7 +94,7 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
8594
s"""
8695
|CREATE TEMPORARY TABLE strtypes
8796
|USING org.apache.spark.sql.jdbc
88-
|OPTIONS (url '$url', dbtable 'TEST.STRTYPES')
97+
|OPTIONS (url '$url', dbtable 'TEST.STRTYPES', user 'testUser', password 'testPass')
8998
""".stripMargin.replaceAll("\n", " "))
9099

91100
conn.prepareStatement("create table test.timetypes (a TIME, b DATE, c TIMESTAMP)"
@@ -97,7 +106,7 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
97106
s"""
98107
|CREATE TEMPORARY TABLE timetypes
99108
|USING org.apache.spark.sql.jdbc
100-
|OPTIONS (url '$url', dbtable 'TEST.TIMETYPES')
109+
|OPTIONS (url '$url', dbtable 'TEST.TIMETYPES', user 'testUser', password 'testPass')
101110
""".stripMargin.replaceAll("\n", " "))
102111

103112

@@ -112,7 +121,7 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
112121
s"""
113122
|CREATE TEMPORARY TABLE flttypes
114123
|USING org.apache.spark.sql.jdbc
115-
|OPTIONS (url '$url', dbtable 'TEST.FLTTYPES')
124+
|OPTIONS (url '$url', dbtable 'TEST.FLTTYPES', user 'testUser', password 'testPass')
116125
""".stripMargin.replaceAll("\n", " "))
117126

118127
// Untested: IDENTITY, OTHER, UUID, ARRAY, and GEOMETRY types.
@@ -174,16 +183,17 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
174183
}
175184

176185
test("Basic API") {
177-
assert(TestSQLContext.jdbc(url, "TEST.PEOPLE").collect.size == 3)
186+
assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE").collect.size == 3)
178187
}
179188

180189
test("Partitioning via JDBCPartitioningInfo API") {
181-
assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", "THEID", 0, 4, 3).collect.size == 3)
190+
assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3)
191+
.collect.size == 3)
182192
}
183193

184194
test("Partitioning via list-of-where-clauses API") {
185195
val parts = Array[String]("THEID < 2", "THEID >= 2")
186-
assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", parts).collect.size == 3)
196+
assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts).collect.size == 3)
187197
}
188198

189199
test("H2 integral types") {
@@ -216,7 +226,6 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
216226
assert(rows(0).getString(5).equals("I am a clob!"))
217227
}
218228

219-
220229
test("H2 time types") {
221230
val rows = sql("SELECT * FROM timetypes").collect()
222231
val cal = new GregorianCalendar(java.util.Locale.ROOT)
@@ -246,17 +255,31 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
246255
.equals(new BigDecimal("123456789012345.54321543215432100000")))
247256
}
248257

249-
250258
test("SQL query as table name") {
251259
sql(
252260
s"""
253261
|CREATE TEMPORARY TABLE hack
254262
|USING org.apache.spark.sql.jdbc
255-
|OPTIONS (url '$url', dbtable '(SELECT B, B*B FROM TEST.FLTTYPES)')
263+
|OPTIONS (url '$url', dbtable '(SELECT B, B*B FROM TEST.FLTTYPES)',
264+
| user 'testUser', password 'testPass')
256265
""".stripMargin.replaceAll("\n", " "))
257266
val rows = sql("SELECT * FROM hack").collect()
258267
assert(rows(0).getDouble(0) == 1.00000011920928955) // Yes, I meant ==.
259268
// For some reason, H2 computes this square incorrectly...
260269
assert(math.abs(rows(0).getDouble(1) - 1.00000023841859331) < 1e-12)
261270
}
271+
272+
test("Pass extra properties via OPTIONS") {
273+
// We set rowId to false during setup, which means that _ROWID_ column should be absent from
274+
// all tables. If rowId is true (default), the query below doesn't throw an exception.
275+
intercept[JdbcSQLException] {
276+
sql(
277+
s"""
278+
|CREATE TEMPORARY TABLE abc
279+
|USING org.apache.spark.sql.jdbc
280+
|OPTIONS (url '$url', dbtable '(SELECT _ROWID_ FROM test.people)',
281+
| user 'testUser', password 'testPass')
282+
""".stripMargin.replaceAll("\n", " "))
283+
}
284+
}
262285
}

0 commit comments

Comments
 (0)