Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.Logging
import org.apache.spark.util.Utils

/**
* A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary
* Hive metastore and warehouse.
*/
class CliSuite extends FunSuite with BeforeAndAfter with Logging {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
Expand Down Expand Up @@ -58,13 +62,13 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
| --master local
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
| --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
| --driver-class-path ${sys.props("java.class.path")}
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}

var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
// Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI.
val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object

Expand Down Expand Up @@ -124,7 +128,7 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test;"
-> "Time taken: "
-> "OK"
)
}

Expand All @@ -151,4 +155,33 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
-> "hive_test"
)
}

test("Commands using SerDe provided in --jars") {
val jarFile =
"../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relative path probably cause problem, as the PWD may not the root path of the spark source code. Get that as what we did for dataFilePath ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because hive-hcatalog-core-0.13.1.jar is under another module (hive). Also, when a test case of a module is executed from SBT and Maven, the working directory is always the directory of that module.

.split("/")
.mkString(File.separator)

val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

runCliWithin(1.minute, Seq("--jars", s"$jarFile"))(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we also need to add test suite for metastore version - 0.12.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense... But let's defer this to a later PR :)

"""CREATE TABLE t1(key string, val string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
""".stripMargin
-> "OK",
"CREATE TABLE sourceTable (key INT, val STRING);"
-> "OK",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
-> "OK",
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
-> "Time taken:",
"SELECT count(key) FROM t1;"
-> "5",
"DROP TABLE t1;"
-> "OK",
"DROP TABLE sourceTable;"
-> "OK"
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package org.apache.spark.sql.hive

import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.net.{URL, URLClassLoader}
import java.sql.Timestamp
import java.util.{ArrayList => JArrayList}

import org.apache.hadoop.hive.ql.parse.VariableSubstitution
import org.apache.spark.sql.catalyst.ParserDialect

import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.language.implicitConversions

import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -188,8 +189,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
"Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
s"or change $HIVE_METASTORE_VERSION to $hiveExecutionVersion.")
}
val jars = getClass.getClassLoader match {
case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs
// We recursively add all jars in the class loader chain,
// starting from the given urlClassLoader.
def addJars(urlClassLoader: URLClassLoader): Array[URL] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addJars => some more meaningful name?

val jarsInParent = urlClassLoader.getParent match {
case parent: URLClassLoader => addJars(parent)
case other => Array.empty[URL]
}

urlClassLoader.getURLs ++ jarsInParent
}

val jars = Utils.getContextOrSparkClassLoader match {
case urlClassLoader: URLClassLoader => addJars(urlClassLoader)
case other =>
throw new IllegalArgumentException(
"Unable to locate hive jars to connect to metastore " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class HadoopTableReader(
makeRDDForTable(
hiveTable,
Class.forName(
relation.tableDesc.getSerdeClassName, true, Utils.getSparkClassLoader)
relation.tableDesc.getSerdeClassName, true, Utils.getContextOrSparkClassLoader)
.asInstanceOf[Class[Deserializer]],
filterOpt = None)

Expand Down