Skip to content

Commit 2db4662

Browse files
CK50rxin
authored andcommitted
[SPARK-11989][SQL] Only use commit in JDBC data source if the underlying database supports transactions
Fixes [SPARK-11989](https://issues.apache.org/jira/browse/SPARK-11989) Author: CK50 <[email protected]> Author: Christian Kurz <[email protected]> Closes #9973 from CK50/branch-1.6_non-transactional. (cherry picked from commit a589736) Signed-off-by: Reynold Xin <[email protected]>
1 parent bf0e85a commit 2db4662

File tree

1 file changed

+19
-3
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc

1 file changed

+19
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.sql.{Connection, PreparedStatement}
2121
import java.util.Properties
2222

2323
import scala.util.Try
24+
import scala.util.control.NonFatal
2425

2526
import org.apache.spark.Logging
2627
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcType, JdbcDialects}
@@ -125,8 +126,19 @@ object JdbcUtils extends Logging {
125126
dialect: JdbcDialect): Iterator[Byte] = {
126127
val conn = getConnection()
127128
var committed = false
129+
val supportsTransactions = try {
130+
conn.getMetaData().supportsDataManipulationTransactionsOnly() ||
131+
conn.getMetaData().supportsDataDefinitionAndDataManipulationTransactions()
132+
} catch {
133+
case NonFatal(e) =>
134+
logWarning("Exception while detecting transaction support", e)
135+
true
136+
}
137+
128138
try {
129-
conn.setAutoCommit(false) // Everything in the same db transaction.
139+
if (supportsTransactions) {
140+
conn.setAutoCommit(false) // Everything in the same db transaction.
141+
}
130142
val stmt = insertStatement(conn, table, rddSchema)
131143
try {
132144
var rowCount = 0
@@ -175,14 +187,18 @@ object JdbcUtils extends Logging {
175187
} finally {
176188
stmt.close()
177189
}
178-
conn.commit()
190+
if (supportsTransactions) {
191+
conn.commit()
192+
}
179193
committed = true
180194
} finally {
181195
if (!committed) {
182196
// The stage must fail. We got here through an exception path, so
183197
// let the exception through unless rollback() or close() want to
184198
// tell the user about another problem.
185-
conn.rollback()
199+
if (supportsTransactions) {
200+
conn.rollback()
201+
}
186202
conn.close()
187203
} else {
188204
// The stage must succeed. We cannot propagate any exception close() might throw.

0 commit comments

Comments
 (0)