Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c7abad3
Added stringDataType option to jdbc connection properties
rama-mullapudi Aug 22, 2015
f1d0b9e
Added maxlength for field metadata so string types can use for VARCHAR.
rama-mullapudi Aug 27, 2015
faff507
Added maxlength for field metadata so string types can use for VARCHAR.
rama-mullapudi Aug 27, 2015
c4b4477
Added maxlength for field metadata so string types can use for VARCHAR.
rama-mullapudi Aug 27, 2015
35e61f3
Added method override for getJDBCType to take 2 parameters DataType a…
rama-mullapudi Sep 14, 2015
dd22b2f
Sync to master
rama-mullapudi Sep 14, 2015
cd809c5
Sync Master
rama-mullapudi Sep 26, 2015
0cfeefa
Sync Master
rama-mullapudi Sep 26, 2015
dddc137
Updated a call with added parameter
rama-mullapudi Sep 26, 2015
5f532e8
Fixed spark code style where comment exceeded 100 chars, and changed …
rama-mullapudi Sep 28, 2015
03f4d96
Merge remote-tracking branch 'upstream/master'
rama-mullapudi Sep 28, 2015
27f118b
Fixed sync with master issue
rama-mullapudi Sep 28, 2015
44e1978
Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType
rama-mullapudi Sep 28, 2015
e605a11
Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType
rama-mullapudi Sep 28, 2015
4cae11b
Fixed JDBCSuite for DB2 to send extra parameter for getJDBCType
rama-mullapudi Sep 29, 2015
4c2a7a4
Updated JDBCDialects to save VARCHAR size to metadata
rama-mullapudi Sep 29, 2015
a0cb024
Updated JDBCDialects to save VARCHAR size to metadata
rama-mullapudi Sep 29, 2015
d50bdf7
Changed call for getJDBCType to call both methods with datatype and m…
rama-mullapudi Oct 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ object JdbcUtils extends Logging {
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field => {
val name = field.name
val typ: String =
dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
// Added getJDBCType with added parameter metadata
val typ: String = dialect.getJDBCType(field.dataType, field.metadata)
.map(_.databaseTypeDefinition).orElse(dialect.getJDBCType(field.dataType)
.map(_.databaseTypeDefinition)).getOrElse(
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
Expand Down Expand Up @@ -202,23 +204,27 @@ object JdbcUtils extends Logging {
properties: Properties = new Properties()) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
case DoubleType => java.sql.Types.DOUBLE
case FloatType => java.sql.Types.REAL
case ShortType => java.sql.Types.INTEGER
case ByteType => java.sql.Types.INTEGER
case BooleanType => java.sql.Types.BIT
case StringType => java.sql.Types.CLOB
case BinaryType => java.sql.Types.BLOB
case TimestampType => java.sql.Types.TIMESTAMP
case DateType => java.sql.Types.DATE
case t: DecimalType => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
// Added getJDBCType with added parameter metadata
dialect.getJDBCType(field.dataType, field.metadata)
.map(_.jdbcNullType).orElse(dialect.getJDBCType(field.dataType)
.map(_.jdbcNullType)).getOrElse(
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
case DoubleType => java.sql.Types.DOUBLE
case FloatType => java.sql.Types.REAL
case ShortType => java.sql.Types.INTEGER
case ByteType => java.sql.Types.INTEGER
case BooleanType => java.sql.Types.BIT
case StringType => java.sql.Types.CLOB
case BinaryType => java.sql.Types.BLOB
case TimestampType => java.sql.Types.TIMESTAMP
case DateType => java.sql.Types.DATE
case t: DecimalType => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
s"Can't translate null value for field $field")
})
}
)
}

val rddSchema = df.schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ abstract class JdbcDialect {
*/
def getJDBCType(dt: DataType): Option[JdbcType] = None

/**
* Retrieve the jdbc / sql type for a given datatype.
* @param dt The datatype (e.g. [[org.apache.spark.sql.types.StringType]])
* @param md The metadata
* @return The new JdbcType if there is an override for this DataType
*/
def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is a DeveloperAPI, it is public so it would be good to fix without breaking binary compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling getJDBCType(dt,Metadata.empty) instead of None in getJDBCType(dt: DataType), because the classes that extends JdbcDialect possibly implement one of them and then the behaviours of the two functions totally different?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry did not get the change you are suggesting, do you mean to call getJDBCType(dt,Metadata.empty) from getJDBCType(dt: DataType).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Quotes the identifier. This is used to put quotes around the identifier in case the column
* name is a reserved keyword, or in case it contains characters that require quotes (e.g. space).
Expand Down Expand Up @@ -138,7 +146,8 @@ object JdbcDialects {
registerDialect(PostgresDialect)
registerDialect(DB2Dialect)
registerDialect(MsSqlServerDialect)

registerDialect(OracleDialect)
registerDialect(NetezzaDialect)

/**
* Fetch the JdbcDialect class corresponding to a given database url.
Expand Down Expand Up @@ -173,8 +182,8 @@ class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
}

override def getJDBCType(dt: DataType): Option[JdbcType] = {
dialects.flatMap(_.getJDBCType(dt)).headOption
override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = {
dialects.flatMap(_.getJDBCType(dt, md)).headOption
}
}

Expand Down Expand Up @@ -205,7 +214,7 @@ case object PostgresDialect extends JdbcDialect {
} else None
}

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
Expand Down Expand Up @@ -253,10 +262,8 @@ case object MySQLDialect extends JdbcDialect {
*/
@DeveloperApi
case object DB2Dialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None
Expand All @@ -278,3 +285,59 @@ case object MsSqlServerDialect extends JdbcDialect {
} else None
}
}

/**
* :: DeveloperApi ::
* Default Oracle dialect, mapping string/boolean on write to valid Oracle types.
*/
@DeveloperApi
case object OracleDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARCHAR && typeName.equals("VARCHAR")) {
// Save varchar size to metadata
md.putLong("maxlength", size)
Some(LongType)
} else None
}

override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = {
if (dt == StringType && md.contains("maxlength")) {
Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.VARCHAR))
} else if (dt == StringType ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove the end single space in parenthesis.

Some(JdbcType("CLOB", java.sql.Types.CLOB))
} else if (dt == BooleanType ) {
Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
} else None
}
}

/**
* :: DeveloperApi ::
* Default Netezza dialect, mapping string/boolean on write to valid Netezza types.
*/
@DeveloperApi
case object NetezzaDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:netezza")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARCHAR && typeName.equals("VARCHAR")) {
// Save varchar size to metadata
md.putLong("maxlength", size)
Some(LongType)
} else None
}

override def getJDBCType(dt: DataType, md: Metadata): Option[JdbcType] = {
if (dt == StringType && md.contains("maxlength")) {
Some(JdbcType(s"VARCHAR(${md.getLong("maxlength")})", java.sql.Types.VARCHAR))
} else if (dt == StringType ) {
Some(JdbcType("VARCHAR(255)", java.sql.Types.VARCHAR))
} else if (dt == BinaryType ) {
Some(JdbcType("BYTEINT", java.sql.Types.BINARY))
} else if (dt == BooleanType ) {
Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
} else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.util.{MetadataCleanerType, Utils}

class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -409,6 +409,8 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect)
assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect)
assert(JdbcDialects.get("jdbc:sqlserver://127.0.0.1/db") == MsSqlServerDialect)
assert(JdbcDialects.get("jdbc:oracle://127.0.0.1/db") == OracleDialect)
assert(JdbcDialects.get("jdbc:netezza://127.0.0.1/db") == NetezzaDialect)
assert(JdbcDialects.get("test.invalid") == NoopDialect)
}

Expand Down Expand Up @@ -448,8 +450,8 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext

test("DB2Dialect type mapping") {
val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db")
assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB")
assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)")
assert(db2Dialect.getJDBCType(StringType, null).map(_.databaseTypeDefinition).get == "CLOB")
assert(db2Dialect.getJDBCType(BooleanType, null).map(_.databaseTypeDefinition).get == "CHAR(1)")
}

test("table exists query by jdbc dialect") {
Expand Down