Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo
checkRow(sql("SELECT * FROM datetime1 where id = 1").head())
}

test("SPARK-20557: column type TIMEZONE with TIME STAMP should be recognized") {
test("SPARK-20557: column type TIMESTAMP with TIME ZONE should be recognized") {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
val rows = dfRead.collect()
val types = rows(0).toSeq.map(x => x.getClass.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
+ "null, null, null, null, null, "
+ "null, null, null, null, null, null, null)"
).executeUpdate()

conn.prepareStatement("CREATE TABLE ts_with_timezone " +
"(id integer, tstz TIMESTAMP WITH TIME ZONE, ttz TIME WITH TIME ZONE)")
.executeUpdate()
conn.prepareStatement("INSERT INTO ts_with_timezone VALUES " +
"(1, TIMESTAMP WITH TIME ZONE '2016-08-12 10:22:31.949271-07', TIME WITH TIME ZONE '17:22:31.949271+00')")
.executeUpdate()
}

test("Type mapping for various types") {
Expand Down Expand Up @@ -126,4 +133,12 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(schema(0).dataType == FloatType)
assert(schema(1).dataType == ShortType)
}

test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE should be recognized") {
val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
val rows = dfRead.collect()
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types(1).equals("class java.sql.Timestamp"))
assert(types(2).equals("class java.sql.Timestamp"))
}
}
2 changes: 1 addition & 1 deletion sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.183</version>
<version>1.4.195</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.jdbc

import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.sql.{Connection, Driver, DriverManager, JDBCType, PreparedStatement, ResultSet, ResultSetMetaData, SQLException}
import java.util.Locale

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -217,23 +217,29 @@ object JdbcUtils extends Logging {
case java.sql.Types.OTHER => null
case java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.REF_CURSOR => null
case java.sql.Types.ROWID => LongType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIME_WITH_TIMEZONE
=> TimestampType
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
=> TimestampType
case -101 => TimestampType // Value for Timestamp with Time Zone in Oracle
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ => null
case _ =>
throw new SQLException("Unrecognized SQL type " + sqlType)
// scalastyle:on
}

if (answer == null) throw new SQLException("Unsupported type " + sqlType)
if (answer == null) {
throw new SQLException("Unsupported type " + JDBCType.valueOf(sqlType).getName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gatorsmile .
Then, it seems that we need to consider IllegalArgumentException from JDBCType.valueOf then.
Can we do that here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Captured in line 236. Thanks!

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I thought JDBCType.valueOf(sqlType) might throw IllegalArgumentException with invalid sqlType.

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. I saw an old code. Forget about that.

}
answer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel



/**
* Internal implementation of the user-facing `Catalog`.
*/
Expand Down
24 changes: 22 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.sql.jdbc

import java.math.BigDecimal
import java.sql.{Date, DriverManager, Timestamp}
import java.sql.{Date, DriverManager, SQLException, Timestamp}
import java.util.{Calendar, GregorianCalendar, Properties}

import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.DataSourceScanExec
Expand Down Expand Up @@ -141,6 +141,15 @@ class JDBCSuite extends SparkFunSuite
|OPTIONS (url '$url', dbtable 'TEST.TIMETYPES', user 'testUser', password 'testPass')
""".stripMargin.replaceAll("\n", " "))

conn.prepareStatement("CREATE TABLE test.timezone (tz TIMESTAMP WITH TIME ZONE) " +
"AS SELECT '1999-01-08 04:05:06.543543543 GMT-08:00'")
.executeUpdate()
conn.commit()

conn.prepareStatement("CREATE TABLE test.array (ar ARRAY) " +
"AS SELECT '(1, 2, 3)'")
.executeUpdate()
conn.commit()

conn.prepareStatement("create table test.flttypes (a DOUBLE, b REAL, c DECIMAL(38, 18))"
).executeUpdate()
Expand Down Expand Up @@ -919,6 +928,17 @@ class JDBCSuite extends SparkFunSuite
assert(res === (foobarCnt, 0L, foobarCnt) :: Nil)
}

test("unsupported types") {
var e = intercept[SparkException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect()
}.getMessage
assert(e.contains("java.lang.UnsupportedOperationException: unimplemented"))
e = intercept[SQLException] {
spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect()
}.getMessage
assert(e.contains("Unsupported type ARRAY"))
}

test("SPARK-19318: Connection properties keys should be case-sensitive.") {
def testJdbcOptions(options: JDBCOptions): Unit = {
// Spark JDBC data source options are case-insensitive
Expand Down