diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index 90343182712ed..8512496e5fe52 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.jdbc import java.sql.{Connection, Date, Timestamp} -import java.util.Properties +import java.util.{Properties, TimeZone} import java.math.BigDecimal -import org.apache.spark.sql.{DataFrame, Row, SaveMode} -import org.apache.spark.sql.execution.{WholeStageCodegenExec, RowDataSourceScanExec} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SaveMode} +import org.apache.spark.sql.execution.{RowDataSourceScanExec, WholeStageCodegenExec} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.tags.DockerTest @@ -77,6 +78,9 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo conn.prepareStatement( "INSERT INTO ts_with_timezone VALUES " + "(1, to_timestamp_tz('1999-12-01 11:00:00 UTC','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate() + conn.prepareStatement( + "INSERT INTO ts_with_timezone VALUES " + + "(2, to_timestamp_tz('1999-12-01 12:00:00 PST','YYYY-MM-DD HH:MI:SS TZR'))").executeUpdate() conn.commit() conn.prepareStatement( @@ -235,6 +239,63 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(types(1).equals("class java.sql.Timestamp")) } + test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from default") { + val defaultJVMTimeZone = TimeZone.getDefault + // Pick the timezone different from the current default time zone of JVM + val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia") + val shanghaiTimeZone = TimeZone.getTimeZone("Asia/Shanghai") + val localSessionTimeZone = + if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else shanghaiTimeZone + + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> localSessionTimeZone.getID) { + val e = intercept[java.sql.SQLException] { + val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) + dfRead.collect() + }.getMessage + assert(e.contains("Unrecognized SQL type -101")) + } + } + + /** + * Change the Time Zone `timeZoneId` of JVM before executing `f`, then switches back to the + * original after `f` returns. + * @param timeZoneId the ID for a TimeZone, either an abbreviation such as "PST", a full name such + * as "America/Los_Angeles", or a custom ID such as "GMT-8:00". + */ + private def withTimeZone(timeZoneId: String)(f: => Unit): Unit = { + val originalLocale = TimeZone.getDefault + try { + // Add Locale setting + TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId)) + f + } finally { + TimeZone.setDefault(originalLocale) + } + } + + test("Column TIMESTAMP with TIME ZONE(JVM timezone)") { + def checkRow(row: Row, ts: String): Unit = { + assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts))) + } + + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> TimeZone.getDefault.getID) { + val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) + withTimeZone("PST") { + assert(dfRead.collect().toSet === + Set( + Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 03:00:00")), + Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 12:00:00")))) + } + + withTimeZone("UTC") { + assert(dfRead.collect().toSet === + Set( + Row(BigDecimal.valueOf(1), java.sql.Timestamp.valueOf("1999-12-01 11:00:00")), + Row(BigDecimal.valueOf(2), java.sql.Timestamp.valueOf("1999-12-01 20:00:00")))) + } + } + } + test("SPARK-18004: Make sure date or timestamp related predicate is pushed down correctly") { val props = new Properties() props.put("oracle.jdbc.mapDateToTimestamp", "false") diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 48aba90afc787..be32cb89f4886 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -151,6 +151,8 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE " + "should be recognized") { + // When using JDBC to read the columns of TIMESTAMP with TIME ZONE and TIME with TIME ZONE + // the actual types are java.sql.Types.TIMESTAMP and java.sql.Types.TIME val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) val rows = dfRead.collect() val types = rows(0).toSeq.map(x => x.getClass.toString) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 75c94fc486493..bbc95df4d9dc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -226,10 +226,10 @@ object JdbcUtils extends Logging { case java.sql.Types.STRUCT => StringType case java.sql.Types.TIME => TimestampType case java.sql.Types.TIME_WITH_TIMEZONE - => TimestampType + => null case java.sql.Types.TIMESTAMP => TimestampType case java.sql.Types.TIMESTAMP_WITH_TIMEZONE - => TimestampType + => null case java.sql.Types.TINYINT => IntegerType case java.sql.Types.VARBINARY => BinaryType case java.sql.Types.VARCHAR => StringType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index e3f106c41c7ff..6ef77f24460be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -18,7 +18,10 @@ package org.apache.spark.sql.jdbc import java.sql.{Date, Timestamp, Types} +import java.util.TimeZone +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -29,6 +32,13 @@ private case object OracleDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") + private def supportTimeZoneTypes: Boolean = { + val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone) + // TODO: support timezone types when users are not using the JVM timezone, which + // is the default value of SESSION_LOCAL_TIMEZONE + timeZone == TimeZone.getDefault + } + override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { sqlType match { @@ -49,7 +59,8 @@ private case object OracleDialect extends JdbcDialect { case _ if scale == -127L => Option(DecimalType(DecimalType.MAX_PRECISION, 10)) case _ => None } - case TIMESTAMPTZ => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle + case TIMESTAMPTZ if supportTimeZoneTypes + => Some(TimestampType) // Value for Timestamp with Time Zone in Oracle case BINARY_FLOAT => Some(FloatType) // Value for OracleTypes.BINARY_FLOAT case BINARY_DOUBLE => Some(DoubleType) // Value for OracleTypes.BINARY_DOUBLE case _ => None diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 61571bccdcb51..0767ca1573a7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1064,10 +1064,10 @@ class JDBCSuite extends SparkFunSuite } test("unsupported types") { - var e = intercept[SparkException] { + var e = intercept[SQLException] { spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties()).collect() }.getMessage - assert(e.contains("java.lang.UnsupportedOperationException: unimplemented")) + assert(e.contains("Unsupported type TIMESTAMP_WITH_TIMEZONE")) e = intercept[SQLException] { spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY", new Properties()).collect() }.getMessage