-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-12010][SQL] Spark JDBC requires support for column-name-free INSERT syntax #10066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…to master-SPARK-12010 Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
|
Can one of the admins verify this patch? |
|
(For those keeping score this is a continuation of #10003 ) This is not required here or anything, but FYI you may want to manually squash and force-push your branch if you're starting over to collapse the merge commits and so on. The merge script will do it anyway but sometimes it's useful even during review. No action here though. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits: braces and newline for the if; use a += 1 instead of a = a + 1 in the two lines below; extra blank line above near the imports.
You should probably also make this more idiomatic and compact. For example this while loop collapses to rddSchema.fields.map(_.name).mkString(", "), I believe. Similarly for the final while. And then the entire method doesn't need to manually build it up with StringBuilder. This is probably a couple lines of code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sean: I am working on your suggested changes. Looks like the code can
be collapsed into one or two lines.
Meanwhile I have found that inserting into an Oracle table having more
columns than columns in the dataframe results in
java.sql.BatchUpdateException: ORA-00947: not enough values if there are
any unmapped columns.
This does not matter, as long as the table matches exactly the
dataframe. But as soon as someone wants to insert into an existing table
with more columns than the dataframe has, this is a problem.
So it may indeed be better to include the suggested change for other
technologies as well.
The key question I see is: Is it okay to rely on the dataframe column
names matching the target table column names?
If so, do you suggest changing the default behaviour to include column
names for all dialects?
Does Spark automated tests have coverage for different databases? /
Would any regression be caught prior to merge?
btw,
re squashing commits: I will try, but for now I need to better
understand how all of this works in GitHub.
On 01.12.2015 13:17, Sean Owen wrote:
In
sql/core/src/main/scala/org/apache/spark/sql/jdbc/CassandraDialect.scala
#10066 (comment):
+private case object CassandraDialect extends JdbcDialect {
+
- override def canHandle(url: String): Boolean =
- url.startsWith("jdbc:datadirect:cassandra") ||
- url.startsWith("jdbc:weblogic:cassandra")
- override def getInsertStatement(table: String, rddSchema: StructType): String = {
- val sql = new StringBuilder(s"INSERT INTO $table ( ")
- var fieldsLeft = rddSchema.fields.length
- var i = 0
- // Build list of column names
- while (fieldsLeft > 0) {
sql.append(rddSchema.fields(i).name)if (fieldsLeft > 1) sql.append(", ")Nits: braces and newline for the |if|; use |a += 1| instead of |a = a
- 1| in the two lines below; extra blank line above near the imports.
You should probably also make this more idiomatic and compact. For
example this |while| loop collapses to
|rddSchema.fields.map(_.name).mkString(", ")|, I believe. Similarly
for the final |while|. And then the entire method doesn't need to
manually build it up with |StringBuilder|. This is probably a couple
lines of code.—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/10066/files#r46270863.
Oracle http://www.oracle.com
Christian Kurz | Consulting Member of Technical Staff
Phone: +49 228 30899431 tel:+49%20228%2030899431 | Mobile: +49 170
2964124 tel:+49%20170%202964124
Oracle Product Development
ORACLE Deutschland B.V. & Co. KG | Hamborner Str. 51 | 40472 Düsseldorf
ORACLE Deutschland B.V. & Co. KG
Hauptverwaltung: Riesstr. 25, D-80992 München
Registergericht: Amtsgericht München, HRA 95603
Komplementärin: ORACLE Deutschland Verwaltung B.V.
Hertogswetering 163/167, 3543 AS Utrecht, Niederlande
Handelsregister der Handelskammer Midden-Niederlande, Nr. 30143697
Geschäftsführer: Alexander van der Ven, Astrid Kepper, Val Maher
Green Oracle http://www.oracle.com/commitment Oracle is committed to
developing practices and products that help protect the environment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're just saying that inserting a DataFrame of m columns into a table of n > m columns doesn't work, right? Yes without column name mappings, I expect this to fail anytime m != n, for any database. Right now this assumes m = n implicitly.
You're right that adding names requires a mapping from data frame column names to DB column names. Hm, I wonder if this needs an optional Map allowing for overrides.
I don't think the regression tests cover all databases, no. I also don't think this can be specific to Oracle anyway.
My workflow for squashing N of the last commits is:
git rebase -i HEAD~N- Change all but the first "pick" to "squash" in the editor and save
- Edit the commit message down to just 1 logical message and save
git push --force origin [your branch]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is exactly the problem encountered.
What if we drop the new Dialect and add a new signature on
DataFrameWriter instead (new columnMapping param):
def jdbc(url: String, table: String, connectionProperties: Properties,
columnMapping: Map<String,String>): Unit
The old signature then continues using the column-name-free INSERT
syntax, but for any advanced use-cases (or technologies, which do not
support column-name-free syntax) the new API can be used.
This ensures full backwards compatibility for all technologies
If this is the way to go, I'd better start a new PR?
My preference would still to keep the refactoring of moving generation
of INSERT statement into the Dialect (instead of in JDBCUtils). Does
this make sense?
On 02.12.2015 11:46, Sean Owen wrote:
In
sql/core/src/main/scala/org/apache/spark/sql/jdbc/CassandraDialect.scala
#10066 (comment):
+private case object CassandraDialect extends JdbcDialect {
+
- override def canHandle(url: String): Boolean =
- url.startsWith("jdbc:datadirect:cassandra") ||
- url.startsWith("jdbc:weblogic:cassandra")
- override def getInsertStatement(table: String, rddSchema: StructType): String = {
- val sql = new StringBuilder(s"INSERT INTO $table ( ")
- var fieldsLeft = rddSchema.fields.length
- var i = 0
- // Build list of column names
- while (fieldsLeft > 0) {
sql.append(rddSchema.fields(i).name)if (fieldsLeft > 1) sql.append(", ")You're just saying that inserting a DataFrame of m columns into a
table of n > m columns doesn't work, right? Yes without column name
mappings, I expect this to fail anytime m != n, for any database.
Right now this assumes m = n implicitly.You're right that adding names requires a mapping from data frame
column names to DB column names. Hm, I wonder if this needs an
optional |Map| allowing for overrides.I don't think the regression tests cover all databases, no. I also
don't think this can be specific to Oracle anyway.My workflow for squashing N of the last commits is:
- |git rebase -i HEAD~N|
- Change all but the first "pick" to "squash" in the editor and save
- Edit the commit message down to just 1 logical message and save
- |git push --force origin [your branch]|
—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/10066/files#r46399876.
Oracle http://www.oracle.com
Christian Kurz | Consulting Member of Technical Staff
Phone: +49 228 30899431 tel:+49%20228%2030899431 | Mobile: +49 170
2964124 tel:+49%20170%202964124
Oracle Product Development
ORACLE Deutschland B.V. & Co. KG | Hamborner Str. 51 | 40472 Düsseldorf
ORACLE Deutschland B.V. & Co. KG
Hauptverwaltung: Riesstr. 25, D-80992 München
Registergericht: Amtsgericht München, HRA 95603
Komplementärin: ORACLE Deutschland Verwaltung B.V.
Hertogswetering 163/167, 3543 AS Utrecht, Niederlande
Handelsregister der Handelskammer Midden-Niederlande, Nr. 30143697
Geschäftsführer: Alexander van der Ven, Astrid Kepper, Val Maher
Green Oracle http://www.oracle.com/commitment Oracle is committed to
developing practices and products that help protect the environment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin are you the best person to ask about overloading DataFrameWriter.jdbc()?
Interesting question about maintaining the current behavior when no column name mapping is specified. In a way it still seems suboptimal to allow this behavior. What if there are the same number of columns, and all are the same type, but the ordering is different? you'd silently insert the wrong data in the wrong column.
Although specifying the DataFrame-to-table column name mapping can be optional (or, the caller can override only the names they want to) I think the SQL statement should be explicit. It does mean that someone who has a DataFrame with differently-named columns somehow might now encounter an exception, but I wonder if that's actually the right thing to enforce going forward. If it doesn't match up by name, don't proceed.
The API changes will take some care to make sure it's unintrusive and backwards compatible.
I suspect it doesn't do much harm to keep the insert statement logic in JdbcDialects though I imagine this behavior, whatever we decide, will be the right thing for all dialects, so it can be a default implementation there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just go ahead and add the new overload of jdbc().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will provide PR shortly
On 10.12.2015 23:44, Sean Owen wrote:
In
sql/core/src/main/scala/org/apache/spark/sql/jdbc/CassandraDialect.scala
#10066 (comment):
+private case object CassandraDialect extends JdbcDialect {
+
- override def canHandle(url: String): Boolean =
- url.startsWith("jdbc:datadirect:cassandra") ||
- url.startsWith("jdbc:weblogic:cassandra")
- override def getInsertStatement(table: String, rddSchema: StructType): String = {
- val sql = new StringBuilder(s"INSERT INTO $table ( ")
- var fieldsLeft = rddSchema.fields.length
- var i = 0
- // Build list of column names
- while (fieldsLeft > 0) {
sql.append(rddSchema.fields(i).name)if (fieldsLeft > 1) sql.append(", ")I would just go ahead and add the new overload of |jdbc()|.
—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/10066/files#r47298240.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened PR #10312. - Please let me know your thoughts. - Thanks, Christian
On 10.12.2015 23:44, Sean Owen wrote:
In
sql/core/src/main/scala/org/apache/spark/sql/jdbc/CassandraDialect.scala
#10066 (comment):
+private case object CassandraDialect extends JdbcDialect {
+
- override def canHandle(url: String): Boolean =
- url.startsWith("jdbc:datadirect:cassandra") ||
- url.startsWith("jdbc:weblogic:cassandra")
- override def getInsertStatement(table: String, rddSchema: StructType): String = {
- val sql = new StringBuilder(s"INSERT INTO $table ( ")
- var fieldsLeft = rddSchema.fields.length
- var i = 0
- // Build list of column names
- while (fieldsLeft > 0) {
sql.append(rddSchema.fields(i).name)if (fieldsLeft > 1) sql.append(", ")I would just go ahead and add the new overload of |jdbc()|.
—
Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/10066/files#r47298240.
No description provided.