Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Oct 27, 2016

What changes were proposed in this pull request?

Apache Spark supports the following cases by quoting RDD column names while saving through JDBC.

  • Allow reserved keyword as a column name, e.g., 'order'.

  • Allow mixed-case colume names like the following, e.g., [a: int, A: int].

    scala> val df = sql("select 1 a, 1 A")
    df: org.apache.spark.sql.DataFrame = [a: int, A: int]
    ...
    scala> df.write.mode("overwrite").format("jdbc").options(option).save()
    scala> df.write.mode("append").format("jdbc").options(option).save()

This PR aims to use database column names instead of RDD column ones in order to support the following additionally.
Note that this case succeeds with MySQL, but fails on Postgres/Oracle before.

val df1 = sql("select 1 a")
val df2 = sql("select 1 A")
...
df1.write.mode("overwrite").format("jdbc").options(option).save()
df2.write.mode("append").format("jdbc").options(option).save()

How was this patch tested?

Pass the Jenkins test with a new testcase.

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67673 has finished for PR 15664 at commit 9558f96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68459 has finished for PR 15664 at commit 9558f96.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Two failures seems to be irrelevant.

[info] - randomized aggregation test - [typed, with partial + safe] - with grouping keys - with non-empty input *** FAILED *** (1 second, 367 milliseconds)

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68481 has finished for PR 15664 at commit 9558f96.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 2, 2016

Test build #69584 has finished for PR 15664 at commit 4bad553.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nameMap -> lowercaseNameMap.

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @viirya .
I'll update like that.

@gatorsmile
Copy link
Member

This is a bug fix, right? Will review this tomorrow.

@dongjoon-hyun
Copy link
Member Author

Yes, right! Thank you, @gatorsmile !

@SparkQA
Copy link

SparkQA commented Dec 5, 2016

Test build #69658 has finished for PR 15664 at commit 528ccfe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@viirya viirya Dec 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the table schema won't change during inserting all data (or it is possible?). You ask table schema for every insert statement now. Can we do this once in caller side (i.e., savePartition) and reuse the schema then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @viirya . I'll try to update like that.

@SparkQA
Copy link

SparkQA commented Dec 26, 2016

Test build #70577 has finished for PR 15664 at commit 11f5874.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get the table schema when we checking whether the table exists.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @gatorsmile .
Yes. That looks great! We can use getSchemaQuery instead of tableExist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name resolution should be still controlled by spark.sql.caseSensitive, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll fix that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we build the INSERT SQL statement in saveTable based on the schema? No need to prepare the generated statement in saveTable.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Dec 27, 2016

The PR is updated to

  • get table schema once in createRelation
  • respect spark.sql.caseSensitive

For insertStatement, I thought it seems to be better to keep the current structure to isolate INSERT PreparedStatement generation code like dialect.quoteIdentifier.

@SparkQA
Copy link

SparkQA commented Dec 27, 2016

Test build #70642 has finished for PR 15664 at commit e0a467c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 27, 2016

Test build #70643 has finished for PR 15664 at commit f803f41.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case Success(v) =>
Some(v)
case Failure(e) =>
None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thank you for review! I wrote that by keeping the same logic in tableExists because I thought the guideline is about using return Try as a return value before this PR.

Sure, I'll remove the usage of Try/Success/Failure.

}
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saving schema is not right. We need a better name here.

rddSchema's sequence and tableSchema's name -> rddSchema's column sequence and tableSchema's column names

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we need to explain why we need to use the column sequences in rddSchema and why we need to use the column names in tableSchema

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll add more correct details here.

if (nameMap.isDefinedAt(f.name)) {
// identical names
schema = schema.add(nameMap(f.name))
} else if (!caseSensitive && lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to improve the comments. Actually, we return case sensitive column names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. I meant case-insensitively identical names. I'll revise this.

} else if (!caseSensitive && lowercaseNameMap.isDefinedAt(f.name.toLowerCase)) {
// case-insensitive identical names
schema = schema.add(lowercaseNameMap(f.name.toLowerCase))
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.spark.SparkException -> AnalysisException

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

saveTable(df, url, table, jdbcOptions)
df.schema
}
saveTable(df, url, table, savingSchema, jdbcOptions)
Copy link
Member

@gatorsmile gatorsmile Dec 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about passing the table schema and resolve/merge the schemas inside saveTable? It might simplify the codes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be. But, to do that, JdbcUtil.saveTable need to understand SaveMode, too. Is it okay?
Currently, JdbcUtil only provides somewhat primitive APIs.

@SparkQA
Copy link

SparkQA commented Dec 28, 2016

Test build #70662 has finished for PR 15664 at commit 8d520dd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 28, 2016

Test build #70663 has finished for PR 15664 at commit 66af06d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

The failure is not related to this PR.

[info] - fatal errors from a source should be sent to the user *** FAILED *** (84 milliseconds)
[info]   org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 5ae8eeb2-81f1-44df-b8ad-55b780512ee5, runId = 3a5bfebc-e3a3-496e-987b-e0e789f1fdcb] terminated with exception: null

// In this case, we should truncate table and then load.
truncateTable(conn, table)
saveTable(df, url, table, jdbcOptions)
val tableSchema = JdbcUtils.getSchemaOption(conn, url, table)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into case statements.
Since JdbcUtils.tableExists is used, getSchemaOption can be skipped for the other SaveMode.

isCaseSensitive: Boolean,
dialect: JdbcDialect): String = {
val columns = if (tableSchema.isEmpty) {
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The legacy behavior is used when tableSchema is None.

@SparkQA
Copy link

SparkQA commented Dec 30, 2016

Test build #70746 has finished for PR 15664 at commit 54adaf5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

The only failure is irrelevant to this PR.

[info] StreamSuite:
[info] - fatal errors from a source should be sent to the user *** FAILED *** (84 milliseconds)
[info]   org.apache.spark.sql.streaming.StreamingQueryException: Query [id = d522aafe-085e-43e4-b796-037695dec113, runId = fc2f679a-907a-4445-8042-9198649bb55d] terminated with exception: null
[info]   at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:296)
[info]   at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:186)
[info]   Cause: org.apache.spark.sql.streaming.StreamSuite$$anonfun$12$$anon$2:

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Dec 30, 2016

Test build #70749 has finished for PR 15664 at commit 54adaf5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

@gatorsmile
Copy link
Member

Merging to master. Thanks!

@asfgit asfgit closed this in b85e294 Dec 30, 2016
@dongjoon-hyun
Copy link
Member Author

Thank you, @gatorsmile .
Happy New Year! :)

cmonkey pushed a commit to cmonkey/spark that referenced this pull request Jan 1, 2017
…ing JDBC Writing

## What changes were proposed in this pull request?

Apache Spark supports the following cases **by quoting RDD column names** while saving through JDBC.
- Allow reserved keyword as a column name, e.g., 'order'.
- Allow mixed-case colume names like the following, e.g., `[a: int, A: int]`.

  ``` scala
  scala> val df = sql("select 1 a, 1 A")
  df: org.apache.spark.sql.DataFrame = [a: int, A: int]
  ...
  scala> df.write.mode("overwrite").format("jdbc").options(option).save()
  scala> df.write.mode("append").format("jdbc").options(option).save()
  ```

This PR aims to use **database column names** instead of RDD column ones in order to support the following additionally.
Note that this case succeeds with `MySQL`, but fails on `Postgres`/`Oracle` before.

``` scala
val df1 = sql("select 1 a")
val df2 = sql("select 1 A")
...
df1.write.mode("overwrite").format("jdbc").options(option).save()
df2.write.mode("append").format("jdbc").options(option).save()
```
## How was this patch tested?

Pass the Jenkins test with a new testcase.

Author: Dongjoon Hyun <[email protected]>
Author: gatorsmile <[email protected]>

Closes apache#15664 from dongjoon-hyun/SPARK-18123.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…ing JDBC Writing

## What changes were proposed in this pull request?

Apache Spark supports the following cases **by quoting RDD column names** while saving through JDBC.
- Allow reserved keyword as a column name, e.g., 'order'.
- Allow mixed-case colume names like the following, e.g., `[a: int, A: int]`.

  ``` scala
  scala> val df = sql("select 1 a, 1 A")
  df: org.apache.spark.sql.DataFrame = [a: int, A: int]
  ...
  scala> df.write.mode("overwrite").format("jdbc").options(option).save()
  scala> df.write.mode("append").format("jdbc").options(option).save()
  ```

This PR aims to use **database column names** instead of RDD column ones in order to support the following additionally.
Note that this case succeeds with `MySQL`, but fails on `Postgres`/`Oracle` before.

``` scala
val df1 = sql("select 1 a")
val df2 = sql("select 1 A")
...
df1.write.mode("overwrite").format("jdbc").options(option).save()
df2.write.mode("append").format("jdbc").options(option).save()
```
## How was this patch tested?

Pass the Jenkins test with a new testcase.

Author: Dongjoon Hyun <[email protected]>
Author: gatorsmile <[email protected]>

Closes apache#15664 from dongjoon-hyun/SPARK-18123.
@dongjoon-hyun dongjoon-hyun deleted the SPARK-18123 branch January 7, 2019 07:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants