From 4b82a2cb92722c27fa958412f578f5e659319de0 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 11 Aug 2015 15:00:23 +0800 Subject: [PATCH 01/19] Import yjshen's PR. --- sql/core/pom.xml | 12 ++ .../apache/spark/sql/jdbc/DockerHacks.scala | 166 ++++++++++++++++++ .../sql/jdbc/MySQLIntegrationSuite.scala | 146 +++++++++++++++ .../sql/jdbc/PostgresIntegrationSuite.scala | 75 ++++++++ 4 files changed, 399 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 349007789f63..fe15e4709dab 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -111,6 +111,18 @@ mockito-core test + + com.spotify + docker-client + 2.7.5 + test + + + guava + com.google.guava + + + target/scala-${scala.binary.version}/classes diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala new file mode 100644 index 000000000000..41c6f7ade3c6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.Connection + +import scala.collection.JavaConverters._ +import scala.collection.mutable.MutableList + +import com.spotify.docker.client.messages.ContainerConfig +import com.spotify.docker.client._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSQLContext +import org.scalatest.BeforeAndAfterAll + +abstract class DatabaseOnDocker { + /** + * The docker image to be pulled + */ + def imageName: String + + /** + * A Seq of environment variables in the form of VAR=value + */ + def env: Seq[String] + + /** + * jdbcUrl should be a lazy val or a function since `ip` it relies on is only available after + * the docker container starts + */ + def jdbcUrl: String + + private val docker: DockerClient = DockerClientFactory.get() + private var containerId: String = null + + lazy val ip = docker.inspectContainer(containerId).networkSettings.ipAddress + + def start(): Unit = { + while (true) { + try { + val config = ContainerConfig.builder() + .image(imageName).env(env.asJava) + .build() + containerId = docker.createContainer(config).id + docker.startContainer(containerId) + return + } catch { + case e: ImageNotFoundException => retry(5)(docker.pull(imageName)) + } + } + } + + private def retry[T](n: Int)(fn: => T): T = { + try { + fn + } catch { + case e if n > 1 => + retry(n - 1)(fn) + } + } + + def close(): Unit = { + docker.killContainer(containerId) + docker.removeContainer(containerId) + DockerClientFactory.close(docker) + } +} + +abstract class DatabaseIntegrationSuite extends SparkFunSuite + with BeforeAndAfterAll with SharedSQLContext { + + def db: DatabaseOnDocker + + def waitForDatabase(ip: String, maxMillis: Long) { + val before = System.currentTimeMillis() + var lastException: java.sql.SQLException = null + while (true) { + if (System.currentTimeMillis() > before + maxMillis) { + throw new java.sql.SQLException(s"Database not up after $maxMillis ms.", lastException) + } + try { + val conn = java.sql.DriverManager.getConnection(db.jdbcUrl) + conn.close() + return + } catch { + case e: java.sql.SQLException => + lastException = e + java.lang.Thread.sleep(250) + } + } + } + + def setupDatabase(ip: String): Unit = { + val conn: Connection = java.sql.DriverManager.getConnection(db.jdbcUrl) + try { + dataPreparation(conn) + } finally { + conn.close() + } + } + + /** + * Prepare databases and tables for testing + */ + def dataPreparation(connection: Connection) + + override def beforeAll() { + super.beforeAll() + db.start() + waitForDatabase(db.ip, 60000) + setupDatabase(db.ip) + } + + override def afterAll() { + try { + db.close() + } finally { + super.afterAll() + } + } +} + +/** + * A factory and morgue for DockerClient objects. In the DockerClient we use, + * calling close() closes the desired DockerClient but also renders all other + * DockerClients inoperable. This is inconvenient if we have more than one + * open, such as during tests. + */ +object DockerClientFactory { + var numClients: Int = 0 + val zombies = new MutableList[DockerClient]() + + def get(): DockerClient = { + this.synchronized { + numClients = numClients + 1 + DefaultDockerClient.fromEnv.build() + } + } + + def close(dc: DockerClient) { + this.synchronized { + numClients = numClients - 1 + zombies += dc + if (numClients == 0) { + zombies.foreach(_.close()) + zombies.clear() + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala new file mode 100644 index 000000000000..b295894c8097 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.math.BigDecimal +import java.sql.{Connection, Date, Timestamp} +import java.util.Properties + +class MySQLIntegrationSuite extends DatabaseIntegrationSuite { + val db = new DatabaseOnDocker { + val imageName = "mysql:latest" + val env = Seq("MYSQL_ROOT_PASSWORD=rootpass") + lazy val jdbcUrl = s"jdbc:mysql://$ip:3306/mysql?user=root&password=rootpass" + } + + override def dataPreparation(conn: Connection) { + conn.prepareStatement("CREATE DATABASE foo").executeUpdate() + conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y TEXT(8))").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate() + conn.prepareStatement("INSERT INTO tbl VALUES (17,'dave')").executeUpdate() + + conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), " + + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, " + + "dbl DOUBLE)").executeUpdate() + conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', " + + "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, " + + "42.75, 1.0000000000000002)").executeUpdate() + + conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, " + + "yr YEAR)").executeUpdate() + conn.prepareStatement("INSERT INTO dates VALUES ('1991-11-09', '13:31:24', " + + "'1996-01-01 01:23:45', '2009-02-13 23:31:30', '2001')").executeUpdate() + + // TODO: Test locale conversion for strings. + conn.prepareStatement("CREATE TABLE strings (a CHAR(10), b VARCHAR(10), c TINYTEXT, " + + "d TEXT, e MEDIUMTEXT, f LONGTEXT, g BINARY(4), h VARBINARY(10), i BLOB)" + ).executeUpdate() + conn.prepareStatement("INSERT INTO strings VALUES ('the', 'quick', 'brown', 'fox', " + + "'jumps', 'over', 'the', 'lazy', 'dog')").executeUpdate() + } + + test("Basic test") { + val df = sqlContext.read.jdbc(db.jdbcUrl, "tbl", new Properties) + val rows = df.collect() + assert(rows.length == 2) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 2) + assert(types(0).equals("class java.lang.Integer")) + assert(types(1).equals("class java.lang.String")) + } + + test("Numeric types") { + val df = sqlContext.read.jdbc(db.jdbcUrl, "numbers", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 9) + assert(types(0).equals("class java.lang.Boolean")) + assert(types(1).equals("class java.lang.Long")) + assert(types(2).equals("class java.lang.Integer")) + assert(types(3).equals("class java.lang.Integer")) + assert(types(4).equals("class java.lang.Integer")) + assert(types(5).equals("class java.lang.Long")) + assert(types(6).equals("class java.math.BigDecimal")) + assert(types(7).equals("class java.lang.Double")) + assert(types(8).equals("class java.lang.Double")) + assert(rows(0).getBoolean(0) == false) + assert(rows(0).getLong(1) == 0x225) + assert(rows(0).getInt(2) == 17) + assert(rows(0).getInt(3) == 77777) + assert(rows(0).getInt(4) == 123456789) + assert(rows(0).getLong(5) == 123456789012345L) + val bd = new BigDecimal("123456789012345.12345678901234500000") + assert(rows(0).getAs[BigDecimal](6).equals(bd)) + assert(rows(0).getDouble(7) == 42.75) + assert(rows(0).getDouble(8) == 1.0000000000000002) + } + + test("Date types") { + val df = sqlContext.read.jdbc(db.jdbcUrl, "dates", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 5) + assert(types(0).equals("class java.sql.Date")) + assert(types(1).equals("class java.sql.Timestamp")) + assert(types(2).equals("class java.sql.Timestamp")) + assert(types(3).equals("class java.sql.Timestamp")) + assert(types(4).equals("class java.sql.Date")) + assert(rows(0).getAs[Date](0).equals(Date.valueOf("1991-11-09"))) + assert(rows(0).getAs[Timestamp](1).equals(Timestamp.valueOf("1970-01-01 13:31:24"))) + assert(rows(0).getAs[Timestamp](2).equals(Timestamp.valueOf("1996-01-01 01:23:45"))) + assert(rows(0).getAs[Timestamp](3).equals(Timestamp.valueOf("2009-02-13 23:31:30"))) + assert(rows(0).getAs[Date](4).equals(Date.valueOf("2001-01-01"))) + } + + test("String types") { + val df = sqlContext.read.jdbc(db.jdbcUrl, "strings", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 9) + assert(types(0).equals("class java.lang.String")) + assert(types(1).equals("class java.lang.String")) + assert(types(2).equals("class java.lang.String")) + assert(types(3).equals("class java.lang.String")) + assert(types(4).equals("class java.lang.String")) + assert(types(5).equals("class java.lang.String")) + assert(types(6).equals("class [B")) + assert(types(7).equals("class [B")) + assert(types(8).equals("class [B")) + assert(rows(0).getString(0).equals("the")) + assert(rows(0).getString(1).equals("quick")) + assert(rows(0).getString(2).equals("brown")) + assert(rows(0).getString(3).equals("fox")) + assert(rows(0).getString(4).equals("jumps")) + assert(rows(0).getString(5).equals("over")) + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), Array[Byte](116, 104, 101, 0))) + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](7), Array[Byte](108, 97, 122, 121))) + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](8), Array[Byte](100, 111, 103))) + } + + test("Basic write test") { + val df1 = sqlContext.read.jdbc(db.jdbcUrl, "numbers", new Properties) + val df2 = sqlContext.read.jdbc(db.jdbcUrl, "dates", new Properties) + val df3 = sqlContext.read.jdbc(db.jdbcUrl, "strings", new Properties) + df1.write.jdbc(db.jdbcUrl, "numberscopy", new Properties) + df2.write.jdbc(db.jdbcUrl, "datescopy", new Properties) + df3.write.jdbc(db.jdbcUrl, "stringscopy", new Properties) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala new file mode 100644 index 000000000000..02d2a25d20b0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.Connection +import java.util.Properties + +class PostgresIntegrationSuite extends DatabaseIntegrationSuite { + val db = new DatabaseOnDocker { + val imageName = "postgres:latest" + val env = Seq("POSTGRES_PASSWORD=rootpass") + lazy val jdbcUrl = s"jdbc:postgresql://$ip:5432/postgres?user=postgres&password=rootpass" + } + + override def dataPreparation(conn: Connection) { + conn.prepareStatement("CREATE DATABASE foo").executeUpdate() + conn.setCatalog("foo") + conn.prepareStatement("CREATE TABLE bar (a text, b integer, c double precision, d bigint, " + + "e bit(1), f bit(10), g bytea, h boolean, i inet, j cidr)").executeUpdate() + conn.prepareStatement("INSERT INTO bar VALUES ('hello', 42, 1.25, 123456789012345, B'0', " + + "B'1000100101', E'\\\\xDEADBEEF', true, '172.16.0.42', '192.168.0.0/16')").executeUpdate() + } + + test("Type mapping for various types") { + val df = sqlContext.read.jdbc(db.jdbcUrl, "bar", new Properties) + val rows = df.collect() + assert(rows.length == 1) + val types = rows(0).toSeq.map(x => x.getClass.toString) + assert(types.length == 10) + assert(types(0).equals("class java.lang.String")) + assert(types(1).equals("class java.lang.Integer")) + assert(types(2).equals("class java.lang.Double")) + assert(types(3).equals("class java.lang.Long")) + assert(types(4).equals("class java.lang.Boolean")) + assert(types(5).equals("class [B")) + assert(types(6).equals("class [B")) + assert(types(7).equals("class java.lang.Boolean")) + assert(types(8).equals("class java.lang.String")) + assert(types(9).equals("class java.lang.String")) + assert(rows(0).getString(0).equals("hello")) + assert(rows(0).getInt(1) == 42) + assert(rows(0).getDouble(2) == 1.25) + assert(rows(0).getLong(3) == 123456789012345L) + assert(rows(0).getBoolean(4) == false) + // BIT(10)'s come back as ASCII strings of ten ASCII 0's and 1's... + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](5), + Array[Byte](49, 48, 48, 48, 49, 48, 48, 49, 48, 49))) + assert(java.util.Arrays.equals(rows(0).getAs[Array[Byte]](6), + Array[Byte](0xDE.toByte, 0xAD.toByte, 0xBE.toByte, 0xEF.toByte))) + assert(rows(0).getBoolean(7) == true) + assert(rows(0).getString(8) == "172.16.0.42") + assert(rows(0).getString(9) == "192.168.0.0/16") + } + + test("Basic write test") { + val df = sqlContext.read.jdbc(db.jdbcUrl, "bar", new Properties) + df.write.jdbc(db.jdbcUrl, "public.barcopy", new Properties) + // Test only that it doesn't bomb out. + } +} From eddcc470c91ec3c32d7fa7657786de53cec0324d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 18 Oct 2015 18:02:57 -0700 Subject: [PATCH 02/19] Use newer shaded docker-client artifact --- sql/core/pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/pom.xml b/sql/core/pom.xml index fe15e4709dab..f29a34a9b970 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -114,7 +114,8 @@ com.spotify docker-client - 2.7.5 + shaded + 2.7.7 test From 8bb62ea45dc9dd88c167f9ba930d0319ec72ddcc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 18 Oct 2015 18:24:14 -0700 Subject: [PATCH 03/19] Use map for setting environment variables. --- .../test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala | 7 ++++--- .../org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 4 +++- .../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 4 +++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala index 41c6f7ade3c6..3544d3d0f948 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala @@ -36,9 +36,9 @@ abstract class DatabaseOnDocker { def imageName: String /** - * A Seq of environment variables in the form of VAR=value + * Environment variables to set inside of the Docker container while lauching it. */ - def env: Seq[String] + def env: Map[String, String] /** * jdbcUrl should be a lazy val or a function since `ip` it relies on is only available after @@ -55,7 +55,8 @@ abstract class DatabaseOnDocker { while (true) { try { val config = ContainerConfig.builder() - .image(imageName).env(env.asJava) + .image(imageName) + .env(env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) .build() containerId = docker.createContainer(config).id docker.startContainer(containerId) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index b295894c8097..5b1fbed484c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -24,7 +24,9 @@ import java.util.Properties class MySQLIntegrationSuite extends DatabaseIntegrationSuite { val db = new DatabaseOnDocker { val imageName = "mysql:latest" - val env = Seq("MYSQL_ROOT_PASSWORD=rootpass") + val env = Map( + "MYSQL_ROOT_PASSWORD" -> "rootpass" + ) lazy val jdbcUrl = s"jdbc:mysql://$ip:3306/mysql?user=root&password=rootpass" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 02d2a25d20b0..242e59c5cf33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -23,7 +23,9 @@ import java.util.Properties class PostgresIntegrationSuite extends DatabaseIntegrationSuite { val db = new DatabaseOnDocker { val imageName = "postgres:latest" - val env = Seq("POSTGRES_PASSWORD=rootpass") + val env = Map( + "POSTGRES_PASSWORD" -> "rootpass" + ) lazy val jdbcUrl = s"jdbc:postgresql://$ip:5432/postgres?user=postgres&password=rootpass" } From 23443a5f708518e9df678b6dd1a14d732d44b872 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 18 Oct 2015 18:24:40 -0700 Subject: [PATCH 04/19] Ensure that Docker container is cleaned up after error. --- .../apache/spark/sql/jdbc/DockerHacks.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala index 3544d3d0f948..4c23c10f3fdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala @@ -20,18 +20,18 @@ package org.apache.spark.sql.jdbc import java.sql.Connection import scala.collection.JavaConverters._ -import scala.collection.mutable.MutableList +import scala.util.control.NonFatal import com.spotify.docker.client.messages.ContainerConfig import com.spotify.docker.client._ +import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.sql.test.SharedSQLContext -import org.scalatest.BeforeAndAfterAll abstract class DatabaseOnDocker { /** - * The docker image to be pulled + * The docker image to be pulled. */ def imageName: String @@ -123,9 +123,19 @@ abstract class DatabaseIntegrationSuite extends SparkFunSuite override def beforeAll() { super.beforeAll() - db.start() - waitForDatabase(db.ip, 60000) - setupDatabase(db.ip) + try { + db.start() + waitForDatabase(db.ip, 60000) + setupDatabase(db.ip) + } catch { + case NonFatal(e) => + try { + afterAll() + } finally { + throw e + } + } + } override def afterAll() { From e6c7709cfde47c87f995b7e55786c59360039bf7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 11:48:20 -0800 Subject: [PATCH 05/19] Remove a bunch of layers of indirection / abstraction. --- .../apache/spark/sql/jdbc/DockerHacks.scala | 167 ++++++------------ .../sql/jdbc/MySQLIntegrationSuite.scala | 24 +-- .../sql/jdbc/PostgresIntegrationSuite.scala | 13 +- 3 files changed, 77 insertions(+), 127 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala index 4c23c10f3fdb..5c71345d91b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala @@ -25,6 +25,8 @@ import scala.util.control.NonFatal import com.spotify.docker.client.messages.ContainerConfig import com.spotify.docker.client._ import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.test.SharedSQLContext @@ -33,100 +35,63 @@ abstract class DatabaseOnDocker { /** * The docker image to be pulled. */ - def imageName: String + val imageName: String /** - * Environment variables to set inside of the Docker container while lauching it. + * Environment variables to set inside of the Docker container while launching it. */ - def env: Map[String, String] + val env: Map[String, String] /** - * jdbcUrl should be a lazy val or a function since `ip` it relies on is only available after - * the docker container starts + * Return a JDBC URL that connects to the database running at the given IP address. */ - def jdbcUrl: String + def getJdbcUrl(ip: String): String +} + +abstract class DatabaseIntegrationSuite + extends SparkFunSuite + with BeforeAndAfterAll + with Eventually + with SharedSQLContext { - private val docker: DockerClient = DockerClientFactory.get() - private var containerId: String = null + val db: DatabaseOnDocker - lazy val ip = docker.inspectContainer(containerId).networkSettings.ipAddress + private var docker: DockerClient = _ + private var containerId: String = _ + protected var jdbcUrl: String = _ - def start(): Unit = { - while (true) { + override def beforeAll() { + super.beforeAll() + try { + docker = DefaultDockerClient.fromEnv.build() + // Ensure that the Docker image is installed: try { - val config = ContainerConfig.builder() - .image(imageName) - .env(env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) - .build() - containerId = docker.createContainer(config).id - docker.startContainer(containerId) - return + docker.inspectImage(db.imageName) } catch { - case e: ImageNotFoundException => retry(5)(docker.pull(imageName)) + case e: ImageNotFoundException => + log.warn(s"Docker image ${db.imageName} not found; pulling image from registry") + docker.pull(db.imageName) } - } - } - - private def retry[T](n: Int)(fn: => T): T = { - try { - fn - } catch { - case e if n > 1 => - retry(n - 1)(fn) - } - } - - def close(): Unit = { - docker.killContainer(containerId) - docker.removeContainer(containerId) - DockerClientFactory.close(docker) - } -} - -abstract class DatabaseIntegrationSuite extends SparkFunSuite - with BeforeAndAfterAll with SharedSQLContext { - - def db: DatabaseOnDocker - - def waitForDatabase(ip: String, maxMillis: Long) { - val before = System.currentTimeMillis() - var lastException: java.sql.SQLException = null - while (true) { - if (System.currentTimeMillis() > before + maxMillis) { - throw new java.sql.SQLException(s"Database not up after $maxMillis ms.", lastException) + // Launch the container: + val config = ContainerConfig.builder() + .image(db.imageName) + .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) + .build() + containerId = docker.createContainer(config).id + docker.startContainer(containerId) + // Wait until the database has started and is accepting JDBC connections: + jdbcUrl = db.getJdbcUrl(ip = docker.inspectContainer(containerId).networkSettings.ipAddress) + eventually(timeout(60.seconds), interval(1.seconds)) { + val conn = java.sql.DriverManager.getConnection(jdbcUrl) + conn.close() } + // Run any setup queries: + val conn: Connection = java.sql.DriverManager.getConnection(jdbcUrl) try { - val conn = java.sql.DriverManager.getConnection(db.jdbcUrl) + dataPreparation(conn) + } finally { conn.close() - return - } catch { - case e: java.sql.SQLException => - lastException = e - java.lang.Thread.sleep(250) } - } - } - - def setupDatabase(ip: String): Unit = { - val conn: Connection = java.sql.DriverManager.getConnection(db.jdbcUrl) - try { - dataPreparation(conn) - } finally { - conn.close() - } - } - - /** - * Prepare databases and tables for testing - */ - def dataPreparation(connection: Connection) - - override def beforeAll() { - super.beforeAll() - try { - db.start() - waitForDatabase(db.ip, 60000) - setupDatabase(db.ip) } catch { case NonFatal(e) => try { @@ -134,44 +99,28 @@ abstract class DatabaseIntegrationSuite extends SparkFunSuite } finally { throw e } - } - + } } override def afterAll() { try { - db.close() + if (docker != null) { + try { + if (containerId != null) { + docker.killContainer(containerId) + docker.removeContainer(containerId) + } + } finally { + docker.close() + } + } } finally { super.afterAll() } } -} - -/** - * A factory and morgue for DockerClient objects. In the DockerClient we use, - * calling close() closes the desired DockerClient but also renders all other - * DockerClients inoperable. This is inconvenient if we have more than one - * open, such as during tests. - */ -object DockerClientFactory { - var numClients: Int = 0 - val zombies = new MutableList[DockerClient]() - - def get(): DockerClient = { - this.synchronized { - numClients = numClients + 1 - DefaultDockerClient.fromEnv.build() - } - } - def close(dc: DockerClient) { - this.synchronized { - numClients = numClients - 1 - zombies += dc - if (numClients == 0) { - zombies.foreach(_.close()) - zombies.clear() - } - } - } + /** + * Prepare databases and tables for testing. + */ + def dataPreparation(connection: Connection): Unit } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index 5b1fbed484c3..ffe0370aa6a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -27,10 +27,10 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite { val env = Map( "MYSQL_ROOT_PASSWORD" -> "rootpass" ) - lazy val jdbcUrl = s"jdbc:mysql://$ip:3306/mysql?user=root&password=rootpass" + def getJdbcUrl(ip: String) = s"jdbc:mysql://$ip:3306/mysql?user=root&password=rootpass" } - override def dataPreparation(conn: Connection) { + override def dataPreparation(conn: Connection): Unit = { conn.prepareStatement("CREATE DATABASE foo").executeUpdate() conn.prepareStatement("CREATE TABLE tbl (x INTEGER, y TEXT(8))").executeUpdate() conn.prepareStatement("INSERT INTO tbl VALUES (42,'fred')").executeUpdate() @@ -57,7 +57,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite { } test("Basic test") { - val df = sqlContext.read.jdbc(db.jdbcUrl, "tbl", new Properties) + val df = sqlContext.read.jdbc(jdbcUrl, "tbl", new Properties) val rows = df.collect() assert(rows.length == 2) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -67,7 +67,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite { } test("Numeric types") { - val df = sqlContext.read.jdbc(db.jdbcUrl, "numbers", new Properties) + val df = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties) val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -94,7 +94,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite { } test("Date types") { - val df = sqlContext.read.jdbc(db.jdbcUrl, "dates", new Properties) + val df = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties) val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -112,7 +112,7 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite { } test("String types") { - val df = sqlContext.read.jdbc(db.jdbcUrl, "strings", new Properties) + val df = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties) val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -138,11 +138,11 @@ class MySQLIntegrationSuite extends DatabaseIntegrationSuite { } test("Basic write test") { - val df1 = sqlContext.read.jdbc(db.jdbcUrl, "numbers", new Properties) - val df2 = sqlContext.read.jdbc(db.jdbcUrl, "dates", new Properties) - val df3 = sqlContext.read.jdbc(db.jdbcUrl, "strings", new Properties) - df1.write.jdbc(db.jdbcUrl, "numberscopy", new Properties) - df2.write.jdbc(db.jdbcUrl, "datescopy", new Properties) - df3.write.jdbc(db.jdbcUrl, "stringscopy", new Properties) + val df1 = sqlContext.read.jdbc(jdbcUrl, "numbers", new Properties) + val df2 = sqlContext.read.jdbc(jdbcUrl, "dates", new Properties) + val df3 = sqlContext.read.jdbc(jdbcUrl, "strings", new Properties) + df1.write.jdbc(jdbcUrl, "numberscopy", new Properties) + df2.write.jdbc(jdbcUrl, "datescopy", new Properties) + df3.write.jdbc(jdbcUrl, "stringscopy", new Properties) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 242e59c5cf33..8d96998dffec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -26,10 +26,11 @@ class PostgresIntegrationSuite extends DatabaseIntegrationSuite { val env = Map( "POSTGRES_PASSWORD" -> "rootpass" ) - lazy val jdbcUrl = s"jdbc:postgresql://$ip:5432/postgres?user=postgres&password=rootpass" + override def getJdbcUrl(ip: String): String = + s"jdbc:postgresql://$ip:5432/postgres?user=postgres&password=rootpass" } - override def dataPreparation(conn: Connection) { + override def dataPreparation(conn: Connection): Unit = { conn.prepareStatement("CREATE DATABASE foo").executeUpdate() conn.setCatalog("foo") conn.prepareStatement("CREATE TABLE bar (a text, b integer, c double precision, d bigint, " @@ -39,7 +40,7 @@ class PostgresIntegrationSuite extends DatabaseIntegrationSuite { } test("Type mapping for various types") { - val df = sqlContext.read.jdbc(db.jdbcUrl, "bar", new Properties) + val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties) val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) @@ -70,8 +71,8 @@ class PostgresIntegrationSuite extends DatabaseIntegrationSuite { } test("Basic write test") { - val df = sqlContext.read.jdbc(db.jdbcUrl, "bar", new Properties) - df.write.jdbc(db.jdbcUrl, "public.barcopy", new Properties) - // Test only that it doesn't bomb out. + val df = sqlContext.read.jdbc(jdbcUrl, "bar", new Properties) + df.write.jdbc(jdbcUrl, "public.barcopy", new Properties) + // Test only that it doesn't crash. } } From d06847bf4b6b893a5a49bde56ca5e0441eac80d2 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 12:20:15 -0800 Subject: [PATCH 06/19] Fix networking for boot2docker. --- .../apache/spark/sql/jdbc/DockerHacks.scala | 36 +++++++++++++++---- .../sql/jdbc/MySQLIntegrationSuite.scala | 8 +++-- .../sql/jdbc/PostgresIntegrationSuite.scala | 9 ++--- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala index 5c71345d91b1..d63f66e21df9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.jdbc +import java.net.ServerSocket import java.sql.Connection import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import com.spotify.docker.client.messages.ContainerConfig +import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding} import com.spotify.docker.client._ import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually @@ -30,6 +31,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils abstract class DatabaseOnDocker { /** @@ -43,9 +45,14 @@ abstract class DatabaseOnDocker { val env: Map[String, String] /** - * Return a JDBC URL that connects to the database running at the given IP address. + * The container-internal JDBC port that the database listens on. */ - def getJdbcUrl(ip: String): String + val jdbcPort: Int + + /** + * Return a JDBC URL that connects to the database running at the given IP address and port. + */ + def getJdbcUrl(ip: String, port: Int): String } abstract class DatabaseIntegrationSuite @@ -72,15 +79,30 @@ abstract class DatabaseIntegrationSuite log.warn(s"Docker image ${db.imageName} not found; pulling image from registry") docker.pull(db.imageName) } - // Launch the container: + // Create the database container: val config = ContainerConfig.builder() .image(db.imageName) + .networkDisabled(false) .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) + .exposedPorts(s"${db.jdbcPort}/tcp") .build() containerId = docker.createContainer(config).id - docker.startContainer(containerId) - // Wait until the database has started and is accepting JDBC connections: - jdbcUrl = db.getJdbcUrl(ip = docker.inspectContainer(containerId).networkSettings.ipAddress) + // Configure networking (necessary for boot2docker / Docker Machine) + val externalPort: Int = { + val sock = new ServerSocket(0) + val port = sock.getLocalPort + sock.close() + port + } + val dockerIp = sys.env.getOrElse("DOCKER_IP", Utils.localHostName()) + val hostConfig: HostConfig = HostConfig.builder() + .networkMode("bridge") + .portBindings( + Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava) + .build() + // Start the container and wait until the database can accept JDBC connections: + docker.startContainer(containerId, hostConfig) + jdbcUrl = db.getJdbcUrl(dockerIp, externalPort) eventually(timeout(60.seconds), interval(1.seconds)) { val conn = java.sql.DriverManager.getConnection(jdbcUrl) conn.close() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index ffe0370aa6a6..d7fefa8d4629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -23,11 +23,13 @@ import java.util.Properties class MySQLIntegrationSuite extends DatabaseIntegrationSuite { val db = new DatabaseOnDocker { - val imageName = "mysql:latest" - val env = Map( + override val imageName = "mysql:latest" + override val env = Map( "MYSQL_ROOT_PASSWORD" -> "rootpass" ) - def getJdbcUrl(ip: String) = s"jdbc:mysql://$ip:3306/mysql?user=root&password=rootpass" + override val jdbcPort: Int = 3306 + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass" } override def dataPreparation(conn: Connection): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 8d96998dffec..5896f5d1eef8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -22,12 +22,13 @@ import java.util.Properties class PostgresIntegrationSuite extends DatabaseIntegrationSuite { val db = new DatabaseOnDocker { - val imageName = "postgres:latest" - val env = Map( + override val imageName = "postgres:latest" + override val env = Map( "POSTGRES_PASSWORD" -> "rootpass" ) - override def getJdbcUrl(ip: String): String = - s"jdbc:postgresql://$ip:5432/postgres?user=postgres&password=rootpass" + override val jdbcPort = 5432 + override def getJdbcUrl(ip: String, port: Int): String = + s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass" } override def dataPreparation(conn: Connection): Unit = { From 93bcf453e957619ae2195a43e4dcf4f56a937b68 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 12:22:35 -0800 Subject: [PATCH 07/19] Log warning in case container cleanup fails --- .../src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala index d63f66e21df9..7abe01fc3f54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala @@ -132,6 +132,9 @@ abstract class DatabaseIntegrationSuite docker.killContainer(containerId) docker.removeContainer(containerId) } + } catch { + case NonFatal(e) => + logWarning(s"Could not stop container $containerId", e) } finally { docker.close() } From bf98ae00cc19d019dc2ca654119b4ad4b75e4bd9 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 12:26:41 -0800 Subject: [PATCH 08/19] Rename class and freeze image versions --- .../{DockerHacks.scala => DockerJDBCIntegrationSuite.scala} | 2 +- .../org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 6 +++--- .../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/jdbc/{DockerHacks.scala => DockerJDBCIntegrationSuite.scala} (99%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala similarity index 99% rename from sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala rename to sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index 7abe01fc3f54..93ad1177c5ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerHacks.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -55,7 +55,7 @@ abstract class DatabaseOnDocker { def getJdbcUrl(ip: String, port: Int): String } -abstract class DatabaseIntegrationSuite +abstract class DockerJDBCIntegrationSuite extends SparkFunSuite with BeforeAndAfterAll with Eventually diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index d7fefa8d4629..ff94eacd375a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -21,9 +21,9 @@ import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties -class MySQLIntegrationSuite extends DatabaseIntegrationSuite { - val db = new DatabaseOnDocker { - override val imageName = "mysql:latest" +class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { + override val db = new DatabaseOnDocker { + override val imageName = "mysql:5.7.9" override val env = Map( "MYSQL_ROOT_PASSWORD" -> "rootpass" ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 5896f5d1eef8..9a8534c7051e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -20,9 +20,9 @@ package org.apache.spark.sql.jdbc import java.sql.Connection import java.util.Properties -class PostgresIntegrationSuite extends DatabaseIntegrationSuite { - val db = new DatabaseOnDocker { - override val imageName = "postgres:latest" +class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { + override val db = new DatabaseOnDocker { + override val imageName = "postgres:9.4.5" override val env = Map( "POSTGRES_PASSWORD" -> "rootpass" ) From 1e389e2b534e7c8da8a502eefd11abc869965620 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 13:52:26 -0800 Subject: [PATCH 09/19] Automatically get docker IP from docker-machine and boot2docker. --- .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index 93ad1177c5ff..c5d63b0b941c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -21,6 +21,8 @@ import java.net.ServerSocket import java.sql.Connection import scala.collection.JavaConverters._ +import scala.sys.process._ +import scala.util.Try import scala.util.control.NonFatal import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding} @@ -94,7 +96,7 @@ abstract class DockerJDBCIntegrationSuite sock.close() port } - val dockerIp = sys.env.getOrElse("DOCKER_IP", Utils.localHostName()) + val dockerIp = getDockerIp() val hostConfig: HostConfig = HostConfig.builder() .networkMode("bridge") .portBindings( @@ -144,6 +146,19 @@ abstract class DockerJDBCIntegrationSuite } } + private def getDockerIp(): String = { + /** If docker-machine is setup on this box, attempts to find the ip from it. */ + def findFromDockerMachine(): Option[String] = { + sys.env.get("DOCKER_MACHINE_NAME").flatMap { name => + Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 2>/dev/null").!!.trim).toOption + } + } + sys.env.get("DOCKER_IP") + .orElse(findFromDockerMachine()) + .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption) + .getOrElse(Utils.localHostName()) + } + /** * Prepare databases and tables for testing. */ From d9c37df16b73b88f18518fd39985aaca6b824e6b Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 13:57:54 -0800 Subject: [PATCH 10/19] Print nicer message if Docker connection fails. --- .../spark/sql/jdbc/DockerJDBCIntegrationSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index c5d63b0b941c..1291ac18f9de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -73,6 +73,14 @@ abstract class DockerJDBCIntegrationSuite super.beforeAll() try { docker = DefaultDockerClient.fromEnv.build() + // Check that Docker is actually up + try { + docker.ping() + } catch { + case NonFatal(e) => + log.error("Exception while connecting to Docker. Check whether Docker is running.") + throw e + } // Ensure that the Docker image is installed: try { docker.inspectImage(db.imageName) From 95f00e10b472221df5a1c6220d95daf9fd0cd5e3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 14:12:37 -0800 Subject: [PATCH 11/19] Add test tag for excluding Docker tests in SBT. --- .../apache/spark/sql/jdbc/DockerTestTag.java | 30 +++++++++++++++++++ .../sql/jdbc/MySQLIntegrationSuite.scala | 1 + .../sql/jdbc/PostgresIntegrationSuite.scala | 1 + 3 files changed, 32 insertions(+) create mode 100644 sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java diff --git a/sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java b/sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java new file mode 100644 index 000000000000..728e366920b5 --- /dev/null +++ b/sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.lang.annotation.ElementType; + +import org.scalatest.TagAnnotation; + +@TagAnnotation +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface DockerTestTag {} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index ff94eacd375a..b08bbfcad535 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -21,6 +21,7 @@ import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties +@DockerTestTag class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { override val imageName = "mysql:5.7.9" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 9a8534c7051e..cd096588ab1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.jdbc import java.sql.Connection import java.util.Properties +@DockerTestTag class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { override val imageName = "postgres:9.4.5" From fcf5dc4e45098a1ce462391fd6885c79403ae3a4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 5 Nov 2015 16:33:14 -0800 Subject: [PATCH 12/19] Upgrade to non-ancient version of docker-client. --- pom.xml | 13 +++++++++++++ sql/core/pom.xml | 7 ------- .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 19 ++++++++++--------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 88ebceca769e..926b340e6d8c 100644 --- a/pom.xml +++ b/pom.xml @@ -763,6 +763,19 @@ 0.10 test + + com.spotify + docker-client + shaded + 3.1.10 + test + + + guava + com.google.guava + + + org.apache.curator curator-recipes diff --git a/sql/core/pom.xml b/sql/core/pom.xml index f29a34a9b970..f59fa495d5f9 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -115,14 +115,7 @@ com.spotify docker-client shaded - 2.7.7 test - - - guava - com.google.guava - - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index 1291ac18f9de..bf0025fa0b3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -89,14 +89,6 @@ abstract class DockerJDBCIntegrationSuite log.warn(s"Docker image ${db.imageName} not found; pulling image from registry") docker.pull(db.imageName) } - // Create the database container: - val config = ContainerConfig.builder() - .image(db.imageName) - .networkDisabled(false) - .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) - .exposedPorts(s"${db.jdbcPort}/tcp") - .build() - containerId = docker.createContainer(config).id // Configure networking (necessary for boot2docker / Docker Machine) val externalPort: Int = { val sock = new ServerSocket(0) @@ -110,8 +102,17 @@ abstract class DockerJDBCIntegrationSuite .portBindings( Map(s"${db.jdbcPort}/tcp" -> List(PortBinding.of(dockerIp, externalPort)).asJava).asJava) .build() + // Create the database container: + val config = ContainerConfig.builder() + .image(db.imageName) + .networkDisabled(false) + .env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava) + .hostConfig(hostConfig) + .exposedPorts(s"${db.jdbcPort}/tcp") + .build() + containerId = docker.createContainer(config).id // Start the container and wait until the database can accept JDBC connections: - docker.startContainer(containerId, hostConfig) + docker.startContainer(containerId) jdbcUrl = db.getJdbcUrl(dockerIp, externalPort) eventually(timeout(60.seconds), interval(1.seconds)) { val conn = java.sql.DriverManager.getConnection(jdbcUrl) From 2273f87048854d5ccb8cdc855ed8870c241a20a7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 6 Nov 2015 01:03:58 -0800 Subject: [PATCH 13/19] Upgrade Jersey to avoid ASM incompatibilities / classpath ordering issues. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 926b340e6d8c..25917a2535cb 100644 --- a/pom.xml +++ b/pom.xml @@ -174,7 +174,7 @@ 3.3.2 3.2.10 2.7.8 - 1.9 + 1.19 2.5 3.5.2 1.3.9 From cba340f75e9543619cc4426c8504985300d6b1a7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 6 Nov 2015 02:24:00 -0800 Subject: [PATCH 14/19] Experiment with bumping Jersey, even though I know we can't do that. --- core/pom.xml | 4 ++++ pom.xml | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/core/pom.xml b/core/pom.xml index 570a25cf325a..8a0f2f77c089 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -207,6 +207,10 @@ com.sun.jersey jersey-core + + com.sun.jersey + jersey-servlet + org.apache.mesos mesos diff --git a/pom.xml b/pom.xml index 7d041eb89482..6e23598c1928 100644 --- a/pom.xml +++ b/pom.xml @@ -698,6 +698,12 @@ ${jersey.version} ${hadoop.deps.scope} + + com.sun.jersey + jersey-servlet + ${jersey.version} + ${hadoop.deps.scope} + com.sun.jersey jersey-json From 1a09a655e9bd6e7add767fd618e530a8fa6fbdc3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 8 Nov 2015 17:36:10 -0800 Subject: [PATCH 15/19] Hack to fix IP address binding in Jenkins. --- .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index bf0025fa0b3e..49e02b1fc121 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.net.ServerSocket +import java.net.{InetAddress, Inet4Address, NetworkInterface, ServerSocket} import java.sql.Connection import scala.collection.JavaConverters._ @@ -33,7 +33,6 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.util.Utils abstract class DatabaseOnDocker { /** @@ -165,7 +164,35 @@ abstract class DockerJDBCIntegrationSuite sys.env.get("DOCKER_IP") .orElse(findFromDockerMachine()) .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption) - .getOrElse(Utils.localHostName()) + .getOrElse { + // This block of code is based on Utils.findLocalInetAddress(), but is modified to blacklist + // certain interfaces. + val address = InetAddress.getLocalHost + // Address resolves to something like 127.0.1.1, which happens on Debian; try to find + // a better address using the local network interfaces + // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order + // on unix-like system. On windows, it returns in index order. + // It's more proper to pick ip address following system output order. + val blackListedIFs = Seq( + "vboxnet0", // Mac + "docker0" // Linux + ) + val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq.filter { i => + !blackListedIFs.contains(i.getName) + } + val reOrderedNetworkIFs = activeNetworkIFs.reverse + for (ni <- reOrderedNetworkIFs) { + val addresses = ni.getInetAddresses.asScala + .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq + if (addresses.nonEmpty) { + val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) + // because of Inet6Address.toHostName may add interface at the end if it knows about it + val strippedAddress = InetAddress.getByAddress(addr.getAddress) + return strippedAddress.getHostAddress + } + } + address.getHostAddress + } } /** From af84a477e9e4d4137b9a054496128a748ee39f99 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 8 Nov 2015 19:17:54 -0800 Subject: [PATCH 16/19] Move docker tests to own subproject. --- core/pom.xml | 4 - docker-integration-tests/pom.xml | 104 ++++++++++++++++++ .../sql/jdbc/DockerJDBCIntegrationSuite.scala | 50 +-------- .../sql/jdbc/MySQLIntegrationSuite.scala | 1 - .../sql/jdbc/PostgresIntegrationSuite.scala | 1 - .../org/apache/spark/util/DockerUtils.scala | 68 ++++++++++++ pom.xml | 9 +- project/SparkBuild.scala | 5 +- sql/core/pom.xml | 6 - .../apache/spark/sql/jdbc/DockerTestTag.java | 30 ----- 10 files changed, 181 insertions(+), 97 deletions(-) create mode 100644 docker-integration-tests/pom.xml rename {sql/core => docker-integration-tests}/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala (68%) rename {sql/core => docker-integration-tests}/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala (99%) rename {sql/core => docker-integration-tests}/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala (99%) create mode 100644 docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala delete mode 100644 sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java diff --git a/core/pom.xml b/core/pom.xml index 8a0f2f77c089..570a25cf325a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -207,10 +207,6 @@ com.sun.jersey jersey-core - - com.sun.jersey - jersey-servlet - org.apache.mesos mesos diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml new file mode 100644 index 000000000000..5af5650a1e2e --- /dev/null +++ b/docker-integration-tests/pom.xml @@ -0,0 +1,104 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.10 + 1.6.0-SNAPSHOT + ../pom.xml + + + spark-docker-integration-tests_2.10 + jar + Spark Project Docker Integration Tests + http://spark.apache.org/ + + docker-integration-tests + + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + + + com.spotify + docker-client + shaded + test + + + + com.sun.jersey + jersey-server + 1.19 + ${hadoop.deps.scope} + + + com.sun.jersey + jersey-core + 1.19 + ${hadoop.deps.scope} + + + com.sun.jersey + jersey-servlet + 1.19 + ${hadoop.deps.scope} + + + com.sun.jersey + jersey-json + 1.19 + + + stax + stax-api + + + + + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala similarity index 68% rename from sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala rename to docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala index 49e02b1fc121..c503c4a13b48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala @@ -17,21 +17,20 @@ package org.apache.spark.sql.jdbc -import java.net.{InetAddress, Inet4Address, NetworkInterface, ServerSocket} +import java.net.ServerSocket import java.sql.Connection import scala.collection.JavaConverters._ -import scala.sys.process._ -import scala.util.Try import scala.util.control.NonFatal -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding} import com.spotify.docker.client._ +import com.spotify.docker.client.messages.{ContainerConfig, HostConfig, PortBinding} import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite +import org.apache.spark.util.DockerUtils import org.apache.spark.sql.test.SharedSQLContext abstract class DatabaseOnDocker { @@ -95,7 +94,7 @@ abstract class DockerJDBCIntegrationSuite sock.close() port } - val dockerIp = getDockerIp() + val dockerIp = DockerUtils.getDockerIp() val hostConfig: HostConfig = HostConfig.builder() .networkMode("bridge") .portBindings( @@ -154,47 +153,6 @@ abstract class DockerJDBCIntegrationSuite } } - private def getDockerIp(): String = { - /** If docker-machine is setup on this box, attempts to find the ip from it. */ - def findFromDockerMachine(): Option[String] = { - sys.env.get("DOCKER_MACHINE_NAME").flatMap { name => - Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 2>/dev/null").!!.trim).toOption - } - } - sys.env.get("DOCKER_IP") - .orElse(findFromDockerMachine()) - .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption) - .getOrElse { - // This block of code is based on Utils.findLocalInetAddress(), but is modified to blacklist - // certain interfaces. - val address = InetAddress.getLocalHost - // Address resolves to something like 127.0.1.1, which happens on Debian; try to find - // a better address using the local network interfaces - // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order - // on unix-like system. On windows, it returns in index order. - // It's more proper to pick ip address following system output order. - val blackListedIFs = Seq( - "vboxnet0", // Mac - "docker0" // Linux - ) - val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq.filter { i => - !blackListedIFs.contains(i.getName) - } - val reOrderedNetworkIFs = activeNetworkIFs.reverse - for (ni <- reOrderedNetworkIFs) { - val addresses = ni.getInetAddresses.asScala - .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq - if (addresses.nonEmpty) { - val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) - // because of Inet6Address.toHostName may add interface at the end if it knows about it - val strippedAddress = InetAddress.getByAddress(addr.getAddress) - return strippedAddress.getHostAddress - } - } - address.getHostAddress - } - } - /** * Prepare databases and tables for testing. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala similarity index 99% rename from sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala rename to docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index b08bbfcad535..ff94eacd375a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -21,7 +21,6 @@ import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties -@DockerTestTag class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { override val imageName = "mysql:5.7.9" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala similarity index 99% rename from sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala rename to docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index cd096588ab1f..9a8534c7051e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.jdbc import java.sql.Connection import java.util.Properties -@DockerTestTag class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { override val imageName = "postgres:9.4.5" diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala b/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala new file mode 100644 index 000000000000..87271776d856 --- /dev/null +++ b/docker-integration-tests/src/test/scala/org/apache/spark/util/DockerUtils.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.{Inet4Address, NetworkInterface, InetAddress} + +import scala.collection.JavaConverters._ +import scala.sys.process._ +import scala.util.Try + +private[spark] object DockerUtils { + + def getDockerIp(): String = { + /** If docker-machine is setup on this box, attempts to find the ip from it. */ + def findFromDockerMachine(): Option[String] = { + sys.env.get("DOCKER_MACHINE_NAME").flatMap { name => + Try(Seq("/bin/bash", "-c", s"docker-machine ip $name 2>/dev/null").!!.trim).toOption + } + } + sys.env.get("DOCKER_IP") + .orElse(findFromDockerMachine()) + .orElse(Try(Seq("/bin/bash", "-c", "boot2docker ip 2>/dev/null").!!.trim).toOption) + .getOrElse { + // This block of code is based on Utils.findLocalInetAddress(), but is modified to blacklist + // certain interfaces. + val address = InetAddress.getLocalHost + // Address resolves to something like 127.0.1.1, which happens on Debian; try to find + // a better address using the local network interfaces + // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order + // on unix-like system. On windows, it returns in index order. + // It's more proper to pick ip address following system output order. + val blackListedIFs = Seq( + "vboxnet0", // Mac + "docker0" // Linux + ) + val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq.filter { i => + !blackListedIFs.contains(i.getName) + } + val reOrderedNetworkIFs = activeNetworkIFs.reverse + for (ni <- reOrderedNetworkIFs) { + val addresses = ni.getInetAddresses.asScala + .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq + if (addresses.nonEmpty) { + val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) + // because of Inet6Address.toHostName may add interface at the end if it knows about it + val strippedAddress = InetAddress.getByAddress(addr.getAddress) + return strippedAddress.getHostAddress + } + } + address.getHostAddress + } + } +} diff --git a/pom.xml b/pom.xml index 6e23598c1928..7c6c31d0d948 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,7 @@ sql/catalyst sql/core sql/hive + docker-integration-tests unsafe assembly external/twitter @@ -175,7 +176,7 @@ 3.3.2 3.2.10 2.7.8 - 1.19 + 1.9 2.9 3.5.2 1.3.9 @@ -698,12 +699,6 @@ ${jersey.version} ${hadoop.deps.scope} - - com.sun.jersey - jersey-servlet - ${jersey.version} - ${hadoop.deps.scope} - com.sun.jersey jersey-json diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b75ed13a78c6..ee5c5ad6d5ed 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -42,8 +42,9 @@ object BuildCommons { "streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, yarnStable, java8Tests, sparkGangliaLgpl, - streamingKinesisAsl) = Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl", - "streaming-kinesis-asl").map(ProjectRef(buildLocation, _)) + streamingKinesisAsl, dockerIntegrationTests) = + Seq("yarn", "yarn-stable", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl", + "docker-integration-tests").map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(assembly, examples, networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingMqttAssembly, streamingKinesisAslAssembly) = Seq("assembly", "examples", "network-yarn", "streaming-flume-assembly", "streaming-kafka-assembly", "streaming-mqtt-assembly", "streaming-kinesis-asl-assembly") diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 4be020ca5198..c96855e261ee 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -110,12 +110,6 @@ mockito-core test - - com.spotify - docker-client - shaded - test - target/scala-${scala.binary.version}/classes diff --git a/sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java b/sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java deleted file mode 100644 index 728e366920b5..000000000000 --- a/sql/core/src/test/java/org/apache/spark/sql/jdbc/DockerTestTag.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.jdbc; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.lang.annotation.ElementType; - -import org.scalatest.TagAnnotation; - -@TagAnnotation -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.METHOD, ElementType.TYPE}) -public @interface DockerTestTag {} From 4899b2e91f474bad79fe1b5d44654f9b76f2b6b0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 8 Nov 2015 19:27:21 -0800 Subject: [PATCH 17/19] Add DockerTest tag. --- docker-integration-tests/pom.xml | 6 +++++ .../sql/jdbc/MySQLIntegrationSuite.scala | 3 +++ .../sql/jdbc/PostgresIntegrationSuite.scala | 3 +++ .../org/apache/spark/tags/DockerTest.java | 26 +++++++++++++++++++ 4 files changed, 38 insertions(+) create mode 100644 tags/src/main/java/org/apache/spark/tags/DockerTest.java diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index 5af5650a1e2e..30b4b5053e73 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -59,6 +59,12 @@ test-jar test + + org.apache.spark + spark-test-tags_${scala.binary.version} + ${project.version} + test + com.spotify docker-client diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index ff94eacd375a..c68e4dc4933b 100644 --- a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -21,6 +21,9 @@ import java.math.BigDecimal import java.sql.{Connection, Date, Timestamp} import java.util.Properties +import org.apache.spark.tags.DockerTest + +@DockerTest class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { override val imageName = "mysql:5.7.9" diff --git a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala index 9a8534c7051e..164a7f396280 100644 --- a/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala +++ b/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.jdbc import java.sql.Connection import java.util.Properties +import org.apache.spark.tags.DockerTest + +@DockerTest class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite { override val db = new DatabaseOnDocker { override val imageName = "postgres:9.4.5" diff --git a/tags/src/main/java/org/apache/spark/tags/DockerTest.java b/tags/src/main/java/org/apache/spark/tags/DockerTest.java new file mode 100644 index 000000000000..0fecf3b8f979 --- /dev/null +++ b/tags/src/main/java/org/apache/spark/tags/DockerTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.tags; + +import java.lang.annotation.*; +import org.scalatest.TagAnnotation; + +@TagAnnotation +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface DockerTest { } From ef8520080b5f684f56a77d1424626f147cf100dd Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 9 Nov 2015 00:57:57 -0800 Subject: [PATCH 18/19] Use non-shaded Docker client. --- docker-integration-tests/pom.xml | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index 30b4b5053e73..7a13b5361ba7 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -35,10 +35,16 @@ + + com.spotify + docker-client + test + org.apache.spark spark-core_${scala.binary.version} ${project.version} + test org.apache.spark @@ -51,6 +57,7 @@ org.apache.spark spark-sql_${scala.binary.version} ${project.version} + test org.apache.spark @@ -65,12 +72,6 @@ ${project.version} test - - com.spotify - docker-client - shaded - test - + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.datatype + jackson-datatype-guava + + + com.fasterxml.jackson.core + jackson-databind + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish.jersey.connectors + jersey-apache-connector + + + org.glassfish.jersey.media + jersey-media-json-jackson + + + + + + com.google.guava + guava + 18.0 org.apache.spark diff --git a/pom.xml b/pom.xml index 7c6c31d0d948..4a96688fb017 100644 --- a/pom.xml +++ b/pom.xml @@ -781,7 +781,7 @@ com.spotify docker-client shaded - 3.1.10 + 3.2.1 test diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ee5c5ad6d5ed..806ac5294466 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -242,6 +242,8 @@ object SparkBuild extends PomBuild { enable(Flume.settings)(streamingFlumeSink) + enable(DockerIntegrationTests.settings)(dockerIntegrationTests) + /** * Adds the ability to run the spark shell directly from SBT without building an assembly @@ -293,6 +295,13 @@ object Flume { lazy val settings = sbtavro.SbtAvro.avroSettings } +object DockerIntegrationTests { + // This serves to override the override specified in DependencyOverrides: + lazy val settings = Seq( + dependencyOverrides += "com.google.guava" % "guava" % "18.0" + ) +} + /** * Overrides to work around sbt's dependency resolution being different from Maven's. */