|
18 | 18 | package org.apache.spark.sql.hive.client |
19 | 19 |
|
20 | 20 | import java.io.{File, PrintStream} |
| 21 | +import java.net.{MalformedURLException, URL} |
21 | 22 |
|
22 | 23 | import scala.collection.JavaConverters._ |
23 | 24 | import scala.collection.mutable.ArrayBuffer |
24 | 25 | import scala.language.reflectiveCalls |
25 | 26 |
|
26 | 27 | import org.apache.hadoop.conf.Configuration |
27 | | -import org.apache.hadoop.fs.Path |
| 28 | +import org.apache.hadoop.fs.{FileSystem, FsUrlStreamHandlerFactory, Path} |
28 | 29 | import org.apache.hadoop.hive.conf.HiveConf |
29 | 30 | import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} |
30 | 31 | import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} |
@@ -86,6 +87,9 @@ private[hive] class HiveClientImpl( |
86 | 87 | // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. |
87 | 88 | private val outputBuffer = new CircularBuffer() |
88 | 89 |
|
| 90 | + // An object lock to ensure URL factory is registered exactly once. |
| 91 | + object URLFactoryRegistrationLock{} |
| 92 | + |
89 | 93 | private val shim = version match { |
90 | 94 | case hive.v12 => new Shim_v0_12() |
91 | 95 | case hive.v13 => new Shim_v0_13() |
@@ -690,7 +694,27 @@ private[hive] class HiveClientImpl( |
690 | 694 | new File(path).toURI.toURL |
691 | 695 | } else { |
692 | 696 | // `path` is a URL with a scheme |
693 | | - uri.toURL |
| 697 | + try { |
| 698 | + uri.toURL |
| 699 | + } catch { |
| 700 | + case e: MalformedURLException => |
| 701 | + Option(FileSystem.get(uri, hadoopConf)) match { |
| 702 | + case Some(fs) => |
| 703 | + URLFactoryRegistrationLock.synchronized { |
| 704 | + try { |
| 705 | + // check one more time, in case another thread set the factory. |
| 706 | + uri.toURL |
| 707 | + } catch { |
| 708 | + case e: MalformedURLException => |
| 709 | + // Register the URLStreamHanlerFactory so hdfs urls work. |
| 710 | + URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf)) |
| 711 | + uri.toURL |
| 712 | + } |
| 713 | + } |
| 714 | + case None => |
| 715 | + throw e |
| 716 | + } |
| 717 | + } |
694 | 718 | } |
695 | 719 | clientLoader.addJar(jarURL) |
696 | 720 | runSqlHive(s"ADD JAR $path") |
|
0 commit comments