Skip to content

Conversation

@EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Jun 8, 2023

What changes were proposed in this pull request?

This is a follow-up on #16685 and #16692.

Implements upsert mode for SaveMode.Append of the MySql, MsSql, and Postgres JDBC source.

See #41611 for an alternative using the MERGE INTO command (not supported by MySql).

Why are the changes needed?

The JDBC writer only supports either truncating the existing table or inserting. Duplicates, i.e. rows with identical values in the primary or unique index columns, cause an exception, permitting updating existing and inserting new rows.

Re-evaluating a partition due to executor loss will insert rows that have been inserted in an earlier attempt, which kills the entier Spark job.

Does this PR introduce any user-facing change?

This adds upsert and upsertKeyColumns options for SaveMode.Append of the JDBC source.

How was this patch tested?

Tests in JdbcSuite and integration suites.

Reopened as #49528.

@melin
Copy link

melin commented Jun 9, 2023

Many databases support merge sql, including oracle
https://issues.apache.org/jira/browse/SPARK-38200

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableDoesNotSupportError("upserts", table)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method tableDoesNotSupportError requires a Table, where I have only a table name string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

Copy link
Contributor

@beliefer beliefer Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can refactor
private def tableDoesNotSupportError(cmd: String, table: Table): Throwable
to
private[sql] def tableDoesNotSupportError(cmd: String, tableName: String): Throwable and update the callers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done in 085f9af.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not necessary!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upserts or upsert ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd go for upsert, as in upsert mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we avoid add parameter?
Please reuse JdbcOptionsInWrite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because saveTable is called for all save modes, but only in Append mode for an existing table, we want to use the upsert statement, for all other code paths, we want to use plain insert.

We could decrease code complexity by removing this upsert argument and use options.isUpsert, but that would use upsert statements in situations where no upserts are needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I know it. We can avoid add upsert here. If options with upsert parameters in Append mode ,we can use it directly. Otherwise, please ignore them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can simplify the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I know in saveTable the current save mode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get from options

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The save mode is not part of options, it is an argument to createRelation.

Copy link
Contributor

@beliefer beliefer Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the parameter name is upsert?
It seems we should add new save mode. The Append cannot describe the semantics of upsert operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is called upsert, because it indicates saveTable to use upsert statements, rather than insert statements:

val insertStmt = if (upsert) {
  getUpsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect, options)
} else {
  getInsertStatement(table, rddSchema, tableSchema, isCaseSensitive, dialect)
}

I am open to introduce the Upsert save mode, but would like to hear other commiters' thoughts on this before I go and add this.

Copy link
Contributor

@beliefer beliefer Oct 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better abstract, I think we should introduce the Upsert mode as you said. Some data sources doesn't support the upsert statement supported by some JDBC database, but we still could implement the Upsert mode by composite some operations like Overwrite mode.

@EnricoMi
Copy link
Contributor Author

Many databases support merge sql, including oracle https://issues.apache.org/jira/browse/SPARK-38200

see #41611

@EnricoMi EnricoMi changed the title [SPARK-19335][SQL] Add upserts for writing to JDBC [SPARK-19335][SPARK-38200][SQL] Add upserts for writing to JDBC Jun 23, 2023
@jatin5251
Copy link

@beliefer can you please approve the PR?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Apr 20, 2024
@harshith-bolar-rapido
Copy link

Any update on this? Upsert is very much needed for jdbc sink

@melin
Copy link

melin commented Jun 7, 2024

Any update on this? Upsert is very much needed for jdbc sink

https://github.com/melin/datatunnel support jdbc upsert. example: pg to oracle

https://github.com/melin/datatunnel/blob/master/examples/src/main/kotlin/com/superior/datatunnel/examples/oracle/OracleUpsertTest.kt

@EnricoMi
Copy link
Contributor Author

Reopened as #49528.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants