File tree Expand file tree Collapse file tree 3 files changed +39
-1
lines changed
sql/core/src/main/scala/org/apache/spark/sql
execution/datasources/jdbc Expand file tree Collapse file tree 3 files changed +39
-1
lines changed Original file line number Diff line number Diff line change @@ -224,6 +224,7 @@ private[sql] object JDBCRDD extends Logging {
224224 quotedColumns,
225225 filters,
226226 parts,
227+ url,
227228 properties)
228229 }
229230}
@@ -241,6 +242,7 @@ private[sql] class JDBCRDD(
241242 columns : Array [String ],
242243 filters : Array [Filter ],
243244 partitions : Array [Partition ],
245+ url : String ,
244246 properties : Properties )
245247 extends RDD [InternalRow ](sc, Nil ) {
246248
@@ -361,6 +363,9 @@ private[sql] class JDBCRDD(
361363 context.addTaskCompletionListener{ context => close() }
362364 val part = thePart.asInstanceOf [JDBCPartition ]
363365 val conn = getConnection()
366+ val dialect = JdbcDialects .get(url)
367+ import scala .collection .JavaConverters ._
368+ dialect.beforeFetch(conn, properties.asScala.toMap)
364369
365370 // H2's JDBC driver does not support the setSchema() method. We pass a
366371 // fully-qualified table name in the SELECT statement. I don't know how to
@@ -489,6 +494,13 @@ private[sql] class JDBCRDD(
489494 }
490495 try {
491496 if (null != conn) {
497+ if (! conn.getAutoCommit && ! conn.isClosed) {
498+ try {
499+ conn.commit()
500+ } catch {
501+ case e : Throwable => logWarning(" Exception committing transaction" , e)
502+ }
503+ }
492504 conn.close()
493505 }
494506 logInfo(" closed connection" )
Original file line number Diff line number Diff line change 1717
1818package org .apache .spark .sql .jdbc
1919
20+ import java .sql .Connection
21+
2022import org .apache .spark .sql .types ._
2123import org .apache .spark .annotation .DeveloperApi
2224
@@ -97,6 +99,15 @@ abstract class JdbcDialect extends Serializable {
9799 s " SELECT * FROM $table WHERE 1=0 "
98100 }
99101
102+ /**
103+ * Override connection specific properties to run before a select is made. This is in place to
104+ * allow dialects that need special treatment to optimize behavior.
105+ * @param connection The connection object
106+ * @param properties The connection properties. This is passed through from the relation.
107+ */
108+ def beforeFetch (connection : Connection , properties : Map [String , String ]): Unit = {
109+ }
110+
100111}
101112
102113/**
Original file line number Diff line number Diff line change 1717
1818package org .apache .spark .sql .jdbc
1919
20- import java .sql .Types
20+ import java .sql .{ Connection , Types }
2121
2222import org .apache .spark .sql .execution .datasources .jdbc .JdbcUtils
2323import org .apache .spark .sql .types ._
@@ -70,4 +70,19 @@ private object PostgresDialect extends JdbcDialect {
7070 override def getTableExistsQuery (table : String ): String = {
7171 s " SELECT 1 FROM $table LIMIT 1 "
7272 }
73+
74+ override def beforeFetch (connection : Connection , properties : Map [String , String ]): Unit = {
75+ super .beforeFetch(connection, properties)
76+
77+ // According to the postgres jdbc documentation we need to be in autocommit=false if we actually
78+ // want to have fetchsize be non 0 (all the rows). This allows us to not have to cache all the
79+ // rows inside the driver when fetching.
80+ //
81+ // See: https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
82+ //
83+ if (properties.getOrElse(" fetchsize" , " 0" ).toInt > 0 ) {
84+ connection.setAutoCommit(false )
85+ }
86+
87+ }
7388}
You can’t perform that action at this time.
0 commit comments