Skip to content

Commit 24caed6

Browse files
adrian-ionescuyhuai
authored andcommitted
[SC-6101][REDSHIFT] Fix auto-enable SSL feature for Postgres driver
Integreation test "test.redshift", part of "dogfood_notebook_tests", was failing for Spark branch-2.1 with an SSL-related error. Root cause: - Postgres JDBC driver expects all-lowercase options in the URL, while we were providing camelCase ones (e.g. _sslRootCert_ as opposed to _sslrootcert_). Why this wasn't caught by Spark-side integration tests: - All tests use the "redshift" subprotocol, which asks for the Redshift driver. - The Postgres integration tests don't involve Redshift and therefore don't exercise this feature. - Even after manually changing the subprotocol to "postgresql", the Redshift driver was still being picked up by _Class.forName("org.postgres.Driver")_ because it was present in the classpath with more priority. Why the intended way of disabling this feature didn't work: - Per Postgres JDBC docs, _&ssl=false_ has in fact the exact opposite effect: it enables SSL encryption (!) ## What changes were proposed in this pull request? - Change ssl-related options to lowercase. - Introduce Dataframe reader option "autoenablessl" for disabling the feature. - Extend Redshift SSL integration test suite to verify that this new flag works. ## How was this patch tested? - _bazel run //spark/images:2.1.x-scala2.10_dogfood_notebook_tests_: https://dogfood.staging.cloud.databricks.com/#job/186727/run/1 - Existing _redshift-integration-tests_ - New integration test ## To Do in a separate PR: - Make _redshift-integration-tests_ also exercise the Postgres driver that we actually bundle. Author: Adrian Ionescu <[email protected]> Closes apache#271 from adrian-ionescu/redshift-ssl-SC-6101.
1 parent 0985111 commit 24caed6

File tree

8 files changed

+109
-49
lines changed

8 files changed

+109
-49
lines changed

external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/AWSCredentialsInUriIntegrationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ class AWSCredentialsInUriIntegrationSuite extends IntegrationSuiteBase {
4242
assert(!AWS_SECRET_ACCESS_KEY.contains("/"), "AWS secret key should not contain slash")
4343
sc = new SparkContext("local", getClass.getSimpleName)
4444
/* Have to create this schema here, because default read/write from base class use it. */
45-
val schemaConn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
45+
val schemaConn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None, true)
4646
schemaConn.createStatement().executeUpdate(s"create schema if not exists $schemaName")
4747
schemaConn.commit()
4848
schemaConn.close()
4949
conn = DefaultJDBCWrapper.getConnector(None,
50-
jdbcUrl, None, Some(s"$schemaName, '$$user', public"))
50+
jdbcUrl, None, true, Some(s"$schemaName, '$$user', public"))
5151
}
5252

5353
test("roundtrip save and load") {

external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ trait IntegrationSuiteBase
9191
sc.hadoopConfiguration.setBoolean("fs.s3n.impl.disable.cache", true)
9292
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY_ID)
9393
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", AWS_SECRET_ACCESS_KEY)
94-
conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
94+
conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None, true)
9595
conn.setAutoCommit(true)
9696
jdbcUpdate(s"create schema if not exists $schemaName")
9797
jdbcUpdate(s"set search_path to $schemaName, '$$user', public")

external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/PostgresDriverIntegrationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class PostgresDriverIntegrationSuite extends IntegrationSuiteBase {
2424
}
2525

2626
test("postgresql driver takes precedence for jdbc:postgresql:// URIs") {
27-
val conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None)
27+
val conn = DefaultJDBCWrapper.getConnector(None, jdbcUrl, None, true)
2828
try {
2929
// TODO(josh): this is slightly different than what was done in open-source spark-redshift.
3030
// This is due to conflicting PG driver being pulled in via transitive Spark test deps.

external/redshift-integration-tests/src/test/scala/com/databricks/spark/redshift/RedshiftSSLIntegrationSuite.scala

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,77 @@
99

1010
package com.databricks.spark.redshift
1111

12+
import java.io.{File, FileInputStream, FileOutputStream}
1213
import java.sql.SQLException
1314

15+
import org.apache.commons.io.IOUtils
16+
1417
import org.apache.spark.tags.ExtendedRedshiftTest
1518

1619
@ExtendedRedshiftTest
1720
class RedshiftSSLIntegrationSuite extends IntegrationSuiteBase {
1821

19-
test("SSL is not auto-configured if SSL options are set in the JDBC URI") {
20-
withTempRedshiftTable("jdbc_url_ssl_options_take_precedence_2") { tableName =>
21-
// If the user specifies SSL options in the URL then this takes precedence.
22-
// In the following test, the user-specified options will not work because
23-
// the sslRootCert is not specified and the Amazon certificate is assumed to
24-
// not be in the system truststore. Therefore, we expect this write to fail:
25-
val e = intercept[SQLException] {
26-
write(sqlContext.range(10))
27-
.option("user", AWS_REDSHIFT_USER)
28-
.option("password", AWS_REDSHIFT_PASSWORD)
29-
.option("url",
30-
s"$AWS_REDSHIFT_JDBC_URL?&ssl=true&sslMode=verify-full")
31-
.option("dbtable", tableName)
32-
.save()
33-
}
34-
assert(e.getMessage.contains("General SSLEngine problem"))
22+
def testThatItWorks(url: String, autoSslOption: Option[Boolean]): Unit = {
23+
val dfReader = read
24+
.option("url", url)
25+
.option("user", AWS_REDSHIFT_USER)
26+
.option("password", AWS_REDSHIFT_PASSWORD)
27+
.option("query", "select 42")
28+
29+
val finalDfReader = autoSslOption match {
30+
case Some(flag) => dfReader.option("autoenablessl", flag.toString)
31+
case _ => dfReader
3532
}
33+
val ret = finalDfReader.load().collect()
34+
assert(ret.size == 1 && ret(0)(0) == 42)
35+
}
36+
37+
def testThatItFails(url: String, autoSslOption: Option[Boolean], errorMessage: String): Unit = {
38+
val e = intercept[SQLException] {
39+
testThatItWorks(url, autoSslOption)
40+
}
41+
assert(e.getMessage.contains(errorMessage))
42+
}
43+
44+
val generalSslproblem = "General SSLEngine problem"
45+
val errorImportingCertificate = "Error importing certificate in to truststore."
46+
47+
test("Simple test") {
48+
testThatItWorks(AWS_REDSHIFT_JDBC_URL, None)
49+
testThatItWorks(AWS_REDSHIFT_JDBC_URL, Some(false))
50+
testThatItWorks(AWS_REDSHIFT_JDBC_URL, Some(true))
51+
}
52+
53+
test("SSL is not auto-configured if SSL options are set in the JDBC URI") {
54+
// If the user specifies SSL options in the URL then this takes precedence.
55+
// In the following test, the user-specified options will not work because
56+
// the sslRootCert is not specified and the Amazon certificate is assumed to
57+
// not be in the system truststore. Therefore, we expect this write to fail:
58+
val url = s"$AWS_REDSHIFT_JDBC_URL?ssl=true&sslMode=verify-full"
59+
60+
testThatItFails(url, None, generalSslproblem)
61+
testThatItFails(url, Some(true), generalSslproblem)
62+
testThatItFails(url, Some(false), generalSslproblem)
63+
}
64+
65+
test("Make sure feature flag works in case of issues") {
66+
// Simulate a problem/bug that's related to this feature by
67+
// deleting the server certificate file.
68+
val existingCert = DefaultJDBCWrapper.redshiftSslCert
69+
assert(existingCert.exists())
70+
val certBackup = File.createTempFile("redshift-ssl-ca-cert", ".bkp")
71+
IOUtils.copy(new FileInputStream(existingCert), new FileOutputStream(certBackup))
72+
existingCert.delete()
73+
74+
testThatItFails(AWS_REDSHIFT_JDBC_URL, None, errorImportingCertificate)
75+
testThatItFails(AWS_REDSHIFT_JDBC_URL, Some(true), errorImportingCertificate)
76+
77+
// Disabling the feature should work around the issue and allow the user to run their query.
78+
testThatItWorks(AWS_REDSHIFT_JDBC_URL, Some(false))
79+
80+
// Restore the file and check that it all works again.
81+
IOUtils.copy(new FileInputStream(certBackup), new FileOutputStream(existingCert))
82+
testThatItWorks(AWS_REDSHIFT_JDBC_URL, None)
83+
testThatItWorks(AWS_REDSHIFT_JDBC_URL, Some(true))
3684
}
3785
}

external/redshift/src/main/scala/com/databricks/spark/redshift/DefaultSource.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ class DefaultSource(
7171
}
7272

7373
def tableExists: Boolean = {
74-
val conn = jdbcWrapper.getConnector(params.jdbcDriver,
75-
params.jdbcUrl, params.credentials, params.searchPath)
74+
val conn = jdbcWrapper.getConnector(params)
7675
try {
7776
jdbcWrapper.tableExists(conn, table.toString)
7877
} finally {

external/redshift/src/main/scala/com/databricks/spark/redshift/Parameters.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ private[redshift] object Parameters {
3232
"usestagingtable" -> "true",
3333
"preactions" -> ";",
3434
"postactions" -> ";",
35-
"singleoutput" -> "false"
35+
"singleoutput" -> "false",
36+
"autoenablessl" -> "true"
3637
)
3738

3839
val VALID_TEMP_FORMATS = Set("AVRO", "CSV", "CSV GZIP")
@@ -290,6 +291,13 @@ private[redshift] object Parameters {
290291
*/
291292
def forwardSparkS3Credentials: Boolean = parameters("forward_spark_s3_credentials").toBoolean
292293

294+
/**
295+
* If true, this library will automatically enable full SSL encryption (sslmode=verify-full)
296+
* for the JDBC connection, using a dynamically downloaded server certificate from Amazon, or
297+
* a pre-bundled one as a backup in case the download fails.
298+
*/
299+
def autoEnableSSL: Boolean = parameters("autoenablessl").toBoolean
300+
293301
/**
294302
* Temporary AWS credentials which are passed to Redshift. These only need to be supplied by
295303
* the user when Hadoop is configured to authenticate to S3 via IAM roles assigned to EC2

external/redshift/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[redshift] class JDBCWrapper {
5050
ExecutionContext.fromExecutorService(Executors.newCachedThreadPool(threadFactory))
5151
}
5252

53-
private lazy val redshiftSslCert: File = {
53+
private[redshift] lazy val redshiftSslCert: File = {
5454
val outFile = File.createTempFile("redshift-ssl-ca-cert", ".tmp")
5555
outFile.deleteOnExit()
5656
try {
@@ -213,12 +213,14 @@ private[redshift] class JDBCWrapper {
213213
* discover the appropriate driver class.
214214
* @param url the JDBC url to connect to.
215215
* @param credentials User credentials
216+
* @param autoEnableSSL Whether or not full SSL encryption should be automatically enabled.
216217
* @param schemaSearchPath Schema search_path to use.
217218
*/
218219
def getConnector(
219220
userProvidedDriverClass: Option[String],
220221
url: String,
221222
credentials: Option[(String, String)],
223+
autoEnableSSL: Boolean,
222224
schemaSearchPath: Option[String] = None) : Connection = {
223225
val subprotocol = url.stripPrefix("jdbc:").split(":")(0)
224226
val driverClass: String = getDriverClass(subprotocol, userProvidedDriverClass)
@@ -250,33 +252,35 @@ private[redshift] class JDBCWrapper {
250252
properties.setProperty("password", password)
251253
}
252254

253-
// We enable SSL by default, unless the user provides any explicit SSL-related settings.
254-
if (!(url.contains("?ssl") || url.contains("&ssl"))) {
255-
val driverVersion = Utils.classForName(driverClass).getPackage.getImplementationVersion
256-
if (driverClass.contains("redshift") &&
257-
Utils.compareVersions(driverVersion, "1.1.17.1017") < 0) {
258-
// The Redshift driver only started supporting `sslRootCert` since version "1.1.17".
259-
// With older drivers the combination of options below results in an uninformative
260-
// `GeneralSSLEngine` error.
261-
// scalastyle:off
262-
throw new RuntimeException(
263-
s"""Old version of Redshift JDBC driver detected ($driverVersion), which does not support
264-
|the `sslRootCert` option that's needed for auto-enabling full SSL encryption.
265-
|The `sslRootCert` option is only supported in versions 1.1.17 and higher.
255+
if (autoEnableSSL) {
256+
// Auto-enable full SSL encryption, unless the user provides any explicit SSL-related settings
257+
if (!(url.contains("?ssl") || url.contains("&ssl"))) {
258+
val driverVersion = Utils.classForName(driverClass).getPackage.getImplementationVersion
259+
if (driverClass.contains("redshift") &&
260+
Utils.compareVersions(driverVersion, "1.1.17.1017") < 0) {
261+
// The Redshift driver only started supporting `sslRootCert` since version "1.1.17".
262+
// With older drivers the combination of options below results in an uninformative
263+
// `GeneralSSLEngine` error.
264+
// scalastyle:off
265+
throw new RuntimeException(
266+
s"""Old version of Redshift JDBC driver detected ($driverVersion), which does not support
267+
|the `sslrootcert` option that's needed for auto-enabling full SSL encryption.
268+
|The `sslrootcert` option is only supported in versions 1.1.17 and higher.
266269
|See https://s3.amazonaws.com/redshift-downloads/drivers/Amazon+Redshift+JDBC+Release+Notes.pdf
267-
|If you're willing to proceed without SSL, then explicitly disable it by adding
268-
|`ssl=false` to your JDBC URL.
270+
|To work around this you can provide your own SSL config options as part of the JDBC url,
271+
|or simply disable this feature via .option("autoenablessl", "false").
269272
""".stripMargin)
270-
// scalastyle:on
273+
// scalastyle:on
274+
} else {
275+
log.info("Auto-enabling full SSL encryption for JDBC connection to Redshift")
276+
properties.setProperty("ssl", "true")
277+
properties.setProperty("sslmode", "verify-full")
278+
properties.setProperty("sslrootcert", redshiftSslCert.toString)
279+
}
271280
} else {
272-
log.info("Auto-enabling full SSL encryption for JDBC connection to Redshift")
273-
properties.setProperty("ssl", "true")
274-
properties.setProperty("sslMode", "verify-full")
275-
properties.setProperty("sslRootCert", redshiftSslCert.toString)
281+
log.info("Not auto-enabling full SSL encryption for JDBC connection to Redshift because " +
282+
"explicit SSL-related options were detected in the JDBC URL")
276283
}
277-
} else {
278-
log.info("Not auto-enabling full SSL encryption for JDBC connection to Redshift because " +
279-
"explicit SSL-related options were detected in the JDBC URL")
280284
}
281285

282286
val conn = driver.connect(url, properties)
@@ -299,7 +303,8 @@ private[redshift] class JDBCWrapper {
299303
val url = params.jdbcUrl
300304
val credentials = params.credentials
301305
val searchPath = params.searchPath
302-
getConnector(driverClass, url, credentials, searchPath)
306+
val autoEnableSSL = params.autoEnableSSL
307+
getConnector(driverClass, url, credentials, autoEnableSSL, searchPath)
303308
}
304309

305310
/**

external/redshift/src/test/scala/com/databricks/spark/redshift/MockRedshift.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class MockRedshift(
6363
override def answer(invocation: InvocationOnMock): Connection = createMockConnection()
6464
}).when(jdbcWrapper)
6565
.getConnector(any[Option[String]](),
66-
same(jdbcUrl), any[Option[(String, String)]](), any[Option[String]]())
66+
same(jdbcUrl), any[Option[(String, String)]](), any[Boolean], any[Option[String]]())
6767

6868
doAnswer(new Answer[Boolean] {
6969
override def answer(invocation: InvocationOnMock): Boolean = {

0 commit comments

Comments
 (0)