From 095b2c9127b161437013476d48d855714716449f Mon Sep 17 00:00:00 2001 From: Stephen Boesch Date: Tue, 5 Aug 2014 12:46:55 -0700 Subject: [PATCH 1/4] Fix tiny bug (likely copy and paste error) in closing jdbc connection --- core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index a76a070b5b863..07dd496588a5e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -106,7 +106,7 @@ class JdbcRDD[T: ClassTag]( case e: Exception => logWarning("Exception closing statement", e) } try { - if (null != conn && ! stmt.isClosed()) conn.close() + if (null != conn && ! conn.isClosed()) conn.close() logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) From 3fb23ed1ecdedc8ce242d50307fc6c3db0b67684 Mon Sep 17 00:00:00 2001 From: Stephen Boesch Date: Tue, 5 Aug 2014 13:25:17 -0700 Subject: [PATCH 2/4] SPARK-2689 Fix potential leak of connection/PreparedStatement in case of error in JdbcRDD --- .../scala/org/apache/spark/rdd/JdbcRDD.scala | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 07dd496588a5e..d5738cc2da869 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.sql.{Connection, ResultSet} +import java.sql.{PreparedStatement, Connection, ResultSet} import scala.reflect.ClassTag @@ -70,20 +70,30 @@ class JdbcRDD[T: ClassTag]( override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { context.addOnCompleteCallback{ () => closeIfNeeded() } val part = thePart.asInstanceOf[JdbcPartition] - val conn = getConnection() - val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + var conn : Connection = _ + var stmt : PreparedStatement = _ + try { + conn = getConnection() + stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, - // rather than pulling entire resultset into memory. - // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html - if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { - stmt.setFetchSize(Integer.MIN_VALUE) - logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") - } + // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, + // rather than pulling entire resultset into memory. + // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html + if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { + stmt.setFetchSize(Integer.MIN_VALUE) + logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") + } - stmt.setLong(1, part.lower) - stmt.setLong(2, part.upper) - val rs = stmt.executeQuery() + stmt.setLong(1, part.lower) + stmt.setLong(2, part.upper) + val rs = stmt.executeQuery() + + } catch { + case e: Exception => + close() + logError("Exception occurred on creating connection/preparedStatement", e) + throw e // Is it correct to throw Exception, or what is preferred cleanup here? + } override def getNext: T = { if (rs.next()) { @@ -106,7 +116,7 @@ class JdbcRDD[T: ClassTag]( case e: Exception => logWarning("Exception closing statement", e) } try { - if (null != conn && ! conn.isClosed()) conn.close() + if (null != conn && ! stmt.isClosed()) conn.close() logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) @@ -120,3 +130,4 @@ object JdbcRDD { Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1)) } } + From 6518d3608daf79241bebc7d68699d81feb90932f Mon Sep 17 00:00:00 2001 From: Stephen Boesch Date: Tue, 5 Aug 2014 14:22:09 -0700 Subject: [PATCH 3/4] SPARK-2689 Fix tiny bug in JdbcRdd for closing jdbc connection --- .../scala/org/apache/spark/rdd/JdbcRDD.scala | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index d5738cc2da869..6b205915bd43b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.sql.{PreparedStatement, Connection, ResultSet} +import java.sql.{Connection, ResultSet} import scala.reflect.ClassTag @@ -70,31 +70,21 @@ class JdbcRDD[T: ClassTag]( override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { context.addOnCompleteCallback{ () => closeIfNeeded() } val part = thePart.asInstanceOf[JdbcPartition] - var conn : Connection = _ - var stmt : PreparedStatement = _ - try { - conn = getConnection() - stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) + val conn = getConnection() + val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) - // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, - // rather than pulling entire resultset into memory. - // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html - if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { - stmt.setFetchSize(Integer.MIN_VALUE) - logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") - } - - stmt.setLong(1, part.lower) - stmt.setLong(2, part.upper) - val rs = stmt.executeQuery() - - } catch { - case e: Exception => - close() - logError("Exception occurred on creating connection/preparedStatement", e) - throw e // Is it correct to throw Exception, or what is preferred cleanup here? + // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results, + // rather than pulling entire resultset into memory. + // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html + if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) { + stmt.setFetchSize(Integer.MIN_VALUE) + logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ") } + stmt.setLong(1, part.lower) + stmt.setLong(2, part.upper) + val rs = stmt.executeQuery() + override def getNext: T = { if (rs.next()) { mapRow(rs) @@ -116,7 +106,7 @@ class JdbcRDD[T: ClassTag]( case e: Exception => logWarning("Exception closing statement", e) } try { - if (null != conn && ! stmt.isClosed()) conn.close() + if (null != conn && ! conn.isClosed()) conn.close() logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) From 363be4ff8b111c1a460ac3299f6c8ccae4df7cb4 Mon Sep 17 00:00:00 2001 From: Stephen Boesch Date: Tue, 5 Aug 2014 16:48:00 -0700 Subject: [PATCH 4/4] SPARK-2869 - Fix tiny bug in JdbcRdd for closing jdbc connection (reformat with braces) --- .../main/scala/org/apache/spark/rdd/JdbcRDD.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala index 6b205915bd43b..8947e66f4577c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala @@ -96,17 +96,23 @@ class JdbcRDD[T: ClassTag]( override def close() { try { - if (null != rs && ! rs.isClosed()) rs.close() + if (null != rs && ! rs.isClosed()) { + rs.close() + } } catch { case e: Exception => logWarning("Exception closing resultset", e) } try { - if (null != stmt && ! stmt.isClosed()) stmt.close() + if (null != stmt && ! stmt.isClosed()) { + stmt.close() + } } catch { case e: Exception => logWarning("Exception closing statement", e) } try { - if (null != conn && ! conn.isClosed()) conn.close() + if (null != conn && ! conn.isClosed()) { + conn.close() + } logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e)