From 6a6de97b646222754c0aab3f85ad6b557727fafb Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Fri, 19 Aug 2016 10:38:31 -0700 Subject: [PATCH 1/2] SPARK-12868: Allow Add jar to add jars from hdfs/s3n urls. --- .../sql/hive/client/HiveClientImpl.scala | 28 +++++++++++++++++-- .../sql/hive/execution/HiveQuerySuite.scala | 10 +++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 81d5a124e9d4a..00cab3a9916b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} +import java.net.{MalformedURLException, URL} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.language.reflectiveCalls import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, FsUrlStreamHandlerFactory, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} @@ -86,6 +87,9 @@ private[hive] class HiveClientImpl( // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. private val outputBuffer = new CircularBuffer() + // An object lock to ensure URL factory is registered exactly once. + object URLFactoryRegistrationLock{} + private val shim = version match { case hive.v12 => new Shim_v0_12() case hive.v13 => new Shim_v0_13() @@ -690,7 +694,27 @@ private[hive] class HiveClientImpl( new File(path).toURI.toURL } else { // `path` is a URL with a scheme - uri.toURL + try { + uri.toURL + } catch { + case e: MalformedURLException => + Option(FileSystem.get(uri, hadoopConf)) match { + case Some(fs) => + URLFactoryRegistrationLock.synchronized { + try { + // check one more time, in case another thread set the factory. + uri.toURL + } catch { + case e: MalformedURLException => + // Register the URLStreamHanlerFactory so hdfs urls work. + URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf)) + uri.toURL + } + } + case None => + throw e + } + } } clientLoader.addJar(jarURL) runSqlHive(s"ADD JAR $path") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 3c7dbb449c521..701dbd431a5c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -856,6 +856,16 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("DROP TABLE alter1") } + test("SPARK-12868 ADD JAR FROM HDFS") { + val testJar = "hdfs://nn:8020/foo.jar" + // This should fail with unknown host, as its just testing the URL parsing + // before SPARK-12868 it was failing with Malformed URI + val e = intercept[RuntimeException] { + sql(s"ADD JAR $testJar") + } + assert(e.getMessage.contains("java.net.UnknownHostException: nn1")) + } + test("ADD JAR command 2") { // this is a test case from mapjoin_addjar.q val testJar = TestHive.getHiveFile("hive-hcatalog-core-0.13.1.jar").getCanonicalPath From 078f05a637de9e9eee3e3d492ca5d15a43d7712f Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Thu, 25 Aug 2016 15:07:53 -0700 Subject: [PATCH 2/2] SPARK-12868: Addressing comments. --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 00cab3a9916b3..e71a101755c5f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -88,7 +88,7 @@ private[hive] class HiveClientImpl( private val outputBuffer = new CircularBuffer() // An object lock to ensure URL factory is registered exactly once. - object URLFactoryRegistrationLock{} + object URLFactoryRegistrationLock private val shim = version match { case hive.v12 => new Shim_v0_12() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 701dbd431a5c6..763a9d6371095 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -863,7 +863,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { val e = intercept[RuntimeException] { sql(s"ADD JAR $testJar") } - assert(e.getMessage.contains("java.net.UnknownHostException: nn1")) + e.printStackTrace() + assert(e.getMessage.contains("java.net.UnknownHostException: nn")) } test("ADD JAR command 2") {