-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20427][SQL] Read JDBC table use custom schema #18266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #77887 has finished for PR 18266 at commit
|
|
Test build #77895 has finished for PR 18266 at commit
|
|
Jenkins, retest this please |
| def putMetadataArray(key: String, value: Array[Metadata]): this.type = put(key, value) | ||
|
|
||
| /** Puts a name. */ | ||
| def putName(name: String): this.type = put("name", name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface change is not desired. See the PR #16209
You can further enhance our parser by supporting the data types that are not natively supported by Spark.
|
Test build #77908 has finished for PR 18266 at commit
|
|
Test build #78093 has finished for PR 18266 at commit
|
| } | ||
|
|
||
| test("SPARK-16848: jdbc API throws an exception for user specified schema") { | ||
| ignore("SPARK-16848: jdbc API throws an exception for user specified schema") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JDBC didn't support specified schema before:
https://github.com/apache/spark/blob/v2.2.0-rc5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L188
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, we should remove this test case.
| StructField("N1", IntegerType, true, new MetadataBuilder().putString("name", "N1").build()), | ||
| StructField("N2", BooleanType, true, new MetadataBuilder().putString("name", "N2").build()))) | ||
|
|
||
| val dfRead = spark.read.schema(schema).jdbc(jdbcUrl, "custom_column_types", new Properties()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
| val schema = StructType(Seq( | ||
| StructField("ID", DecimalType(DecimalType.MAX_PRECISION, 0), true, | ||
| new MetadataBuilder().putString("name", "ID").build()), | ||
| StructField("N1", IntegerType, true, new MetadataBuilder().putString("name", "N1").build()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why adding new MetadataBuilder().putString("name", "N1").build()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JDBCRDD will read metadata:
https://github.com/apache/spark/blob/v2.2.0-rc5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala#L85
I'll change here next commit.
| */ | ||
| private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { | ||
| val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) | ||
| val fieldMap = Map(schema.fields.map(x => x.name -> x): _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
x.metadata.getString("name") always equals x.name:
https://github.com/apache/spark/blob/v2.2.0-rc5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L291
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a related change. Could you revert it back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalystSqlParser.parseTableSchema(columnTypes) constructed StructType without metadata, error message:
key not found: name
java.util.NoSuchElementException: key not found: name
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.sql.types.Metadata.get(Metadata.scala:111)
at org.apache.spark.sql.types.Metadata.getString(Metadata.scala:60)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$1.apply(JDBCRDD.scala:83)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$1.apply(JDBCRDD.scala:83)
|
Test build #78526 has finished for PR 18266 at commit
|
| // TODO: to reuse the existing partition parameters for those partition specific options | ||
| val createTableOptions = parameters.getOrElse(JDBC_CREATE_TABLE_OPTIONS, "") | ||
| val createTableColumnTypes = parameters.get(JDBC_CREATE_TABLE_COLUMN_TYPES) | ||
| val customSchema = parameters.get(JDBC_CUSTOM_SCHEMA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
convert it to StructType here.
|
I am fine to support customized schema for read path of JDBC relation. However, we need to check whether the user-specified schema matches the underlying the table schema. If not matched, we need to capture it earlier and issue a proper error message. |
|
Test build #79139 has finished for PR 18266 at commit
|
|
Test build #79137 has finished for PR 18266 at commit
|
|
Test build #79136 has finished for PR 18266 at commit
|
|
The example in the PR description looks a little bit confusing. val dfRead = spark.read.schema(schema).jdbc(jdbcUrl, "tableWithCustomSchema", new Properties())Could you update it? |
| this.extraOptions ++= properties.asScala | ||
| // explicit url and dbtable should override all | ||
| this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table) | ||
| if (userSpecifiedSchema.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also update another API in the line 273.
|
|
||
| // default will throw IllegalArgumentException | ||
| val e = intercept[org.apache.spark.SparkException] { | ||
| spark.read.jdbc(jdbcUrl, "custom_column_types", new Properties()).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Change the table names in all the test cases.
|
Test build #81084 has finished for PR 18266 at commit
|
|
As far as I understand the proposed solution recommends using |
|
Yes, mapping to Double seems fine. this test passed: test("SPARK-20427/SPARK-20921: read table use custom schema by jdbc api") {
// default will throw IllegalArgumentException
val e = intercept[org.apache.spark.SparkException] {
spark.read.jdbc(jdbcUrl, "tableWithCustomSchema", new Properties()).collect()
}
assert(e.getMessage.contains(
"requirement failed: Decimal precision 39 exceeds max precision 38"))
// custom schema can read data
val props = new Properties()
props.put("customDataFrameColumnTypes",
s"ID double, N1 int, N2 boolean")
val dfRead = spark.read.jdbc(jdbcUrl, "tableWithCustomSchema", props)
val rows = dfRead.collect()
// verify the data type
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types(0).equals("class java.lang.Double"))
assert(types(1).equals("class java.lang.Integer"))
assert(types(2).equals("class java.lang.Boolean"))
// verify the value
val values = rows(0)
assert(values.getDouble(0).equals(12312321321321312312312312123D))
assert(values.getInt(1).equals(1))
assert(values.getBoolean(2).equals(false))
} |
|
will review this today. |
docs/sql-programming-guide.md
Outdated
| </tr> | ||
|
|
||
| <tr> | ||
| <td><code>customDataFrameColumnTypes</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
customSchema
docs/sql-programming-guide.md
Outdated
| <tr> | ||
| <td><code>customDataFrameColumnTypes</code></td> | ||
| <td> | ||
| The DataFrame column data types to use instead of the defaults when reading data from jdbc API. (e.g: <code>"id DECIMAL(38, 0), name STRING")</code>. The specified types should be valid spark sql data types. This option applies only to reading. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not limited to DataFrame.
The custom schema to use for reading data from JDBC connectors. For example,
"id DECIMAL(38, 0), name STRING"). The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
| .option("dbtable", "schema.tablename") \ | ||
| .option("user", "username") \ | ||
| .option("password", "password") \ | ||
| .option("customDataFrameColumnTypes", "id DECIMAL(38, 0), name STRING") \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
customSchema
| val jdbcDF2 = spark.read | ||
| .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) | ||
| // Specifying dataframe column data types on read | ||
| connectionProperties.put("customDataFrameColumnTypes", "id DECIMAL(38, 0), name STRING") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
customSchema
| connectionProperties.put("password", "password") | ||
| val jdbcDF2 = spark.read | ||
| .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties) | ||
| // Specifying dataframe column data types on read |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifying the custom data types of the read schema
| */ | ||
| def jdbc(url: String, table: String, properties: Properties): DataFrame = { | ||
| assertNoSpecifiedSchema("jdbc") | ||
| assertJdbcAPISpecifiedDataFrameSchema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users should be able to do it in either way. If users specify them in both schema() API and the customerSchema option, we should issue an exception.
|
Test build #81536 has finished for PR 18266 at commit
|
| */ | ||
| private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { | ||
| val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) | ||
| val fieldMap = Map(schema.fields.map(x => x.name -> x): _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I did not get your point. Could you show me an example? Is it a behavior breaking change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parseTableSchema("id int, name string").fields.map(x => x.metadata.getString("name") -> x)
java.util.NoSuchElementException: key not found: name
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at org.apache.spark.sql.types.Metadata.get(Metadata.scala:111)
at org.apache.spark.sql.types.Metadata.getString(Metadata.scala:60)
at $anonfun$1.apply(<console>:24)
at $anonfun$1.apply(<console>:24)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
... 48 elided
|
Test build #81601 has finished for PR 18266 at commit
|
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good! Thanks!
| .option("dbtable", "schema.tablename") \ | ||
| .option("user", "username") \ | ||
| .option("password", "password") \ | ||
| .option("customDataFrameColumnTypes", "id DECIMAL(38, 0), name STRING") \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename this.
| */ | ||
| private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { | ||
| val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) | ||
| val fieldMap = Map(schema.fields.map(x => x.name -> x): _*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Could we just get rid of the line where we put name in the metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems safe to remove this line.
| sqlContext.sessionState.conf.resolver) | ||
| } else { | ||
| schema | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
jdbcOptions.customSchema match {
case Some(customSchema) => JdbcUtils.parseUserSpecifiedColumnTypes(
tableSchema, customSchema, sparkSession.sessionState.conf.resolver)
case None => tableSchema
}| */ | ||
| def parseUserSpecifiedColumnTypes( | ||
| schema: StructType, | ||
| columnTypes: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def getCustomSchema(
tableSchema: StructType,
customSchema: String,
nameEquality: Resolver): StructType = {| userSchema.fieldNames.foreach { col => | ||
| schema.find(f => nameEquality(f.name, col)).getOrElse { | ||
| throw new AnalysisException( | ||
| s"${JDBCOptions.JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES} option column $col not found in " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val colNames = tableSchema.fieldNames.mkString(",")
throw new AnalysisException(s"Please provide all the columns, all columns are: $colNames")|
Test build #81719 has finished for PR 18266 at commit
|
|
@wangyum Could you update the example in the PR description? |
|
@gatorsmile Done |
|
LGTM |
|
Thanks! Merged to master. |
…partial fields. ## What changes were proposed in this pull request? apache#18266 add a new feature to support read JDBC table use custom schema, but we must specify all the fields. For simplicity, this PR support specify partial fields. ## How was this patch tested? unit tests Author: Yuming Wang <[email protected]> Closes apache#19231 from wangyum/SPARK-22002.
What changes were proposed in this pull request?
Auto generated Oracle schema some times not we expect:
number(1)auto mapped to BooleanType, some times it's not we expect, per SPARK-20921.numberauto mapped to Decimal(38,10), It can't read big data, per SPARK-20427.This PR fix this issue by custom schema as follows:
or
How was this patch tested?
unit tests