diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 81d5a124e9d4..e71a101755c5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} +import java.net.{MalformedURLException, URL} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.language.reflectiveCalls import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, FsUrlStreamHandlerFactory, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} @@ -86,6 +87,9 @@ private[hive] class HiveClientImpl( // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() + // An object lock to ensure URL factory is registered exactly once. + object URLFactoryRegistrationLock + private val shim = version match { case hive.v12 => new Shim_v0_12() case hive.v13 => new Shim_v0_13() @@ -690,7 +694,27 @@ private[hive] class HiveClientImpl( new File(path).toURI.toURL } else { // `path` is a URL with a scheme - uri.toURL + try { + uri.toURL + } catch { + case e: MalformedURLException => + Option(FileSystem.get(uri, hadoopConf)) match { + case Some(fs) => + URLFactoryRegistrationLock.synchronized { + try { + // check one more time, in case another thread set the factory. + uri.toURL + } catch { + case e: MalformedURLException => + // Register the URLStreamHanlerFactory so hdfs urls work. + URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf)) + uri.toURL + } + } + case None => + throw e + } + } } clientLoader.addJar(jarURL) runSqlHive(s"ADD JAR $path") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 3c7dbb449c52..763a9d637109 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -856,6 +856,17 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("DROP TABLE alter1") } + test("SPARK-12868 ADD JAR FROM HDFS") { + val testJar = "hdfs://nn:8020/foo.jar" + // This should fail with unknown host, as its just testing the URL parsing + // before SPARK-12868 it was failing with Malformed URI + val e = intercept[RuntimeException] { + sql(s"ADD JAR $testJar") + } + e.printStackTrace() + assert(e.getMessage.contains("java.net.UnknownHostException: nn")) + } + test("ADD JAR command 2") { // this is a test case from mapjoin_addjar.q val testJar = TestHive.getHiveFile("hive-hcatalog-core-0.13.1.jar").getCanonicalPath