From d671facb7383936b2901ea353cdfd7cc06a57b57 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sat, 18 Mar 2017 11:55:28 -0700 Subject: [PATCH 1/5] [SPARK-18910] Allow adding jars from hdfs --- .../scala/org/apache/spark/util/Utils.scala | 23 ++++++++++++++++++- .../org/apache/spark/util/UtilsSuite.scala | 17 +++++++++++++- .../spark/sql/internal/SharedState.scala | 5 +++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 943dde0723271..595f76e7e2fac 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -47,7 +47,7 @@ import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.fs.{FileSystem, FileUtil, FsUrlStreamHandlerFactory, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException @@ -2780,3 +2780,24 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou new String(nonCircularBuffer, StandardCharsets.UTF_8) } } + + +/** + * Factory for URL stream handlers. It relies on 'protocol' to choose the appropriate + * UrlStreamHandlerFactory to create URLStreamHandler. Adding new 'if' branches in + * 'createURLStreamHandler' like 'hdfsHandler' to support more protocols. + */ +private[spark] class SparkUrlStreamHandlerFactory extends URLStreamHandlerFactory { + private var hdfsHandler : URLStreamHandler = _ + + def createURLStreamHandler(protocol: String): URLStreamHandler = { + if (protocol.compareToIgnoreCase("hdfs") == 0) { + if (hdfsHandler == null) { + hdfsHandler = new FsUrlStreamHandlerFactory().createURLStreamHandler(protocol) + } + hdfsHandler + } else { + null + } + } +} diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 8ed09749ffd54..2a28267434987 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, FileOutputStream, PrintStream} import java.lang.{Double => JDouble, Float => JFloat} -import java.net.{BindException, ServerSocket, URI} +import java.net._ import java.nio.{ByteBuffer, ByteOrder} import java.nio.charset.StandardCharsets import java.text.DecimalFormatSymbols @@ -1021,4 +1021,19 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { secretKeys.foreach { key => assert(redactedConf(key) === Utils.REDACTION_REPLACEMENT_TEXT) } assert(redactedConf("spark.regular.property") === "not_a_secret") } + + test("SparkUrlStreamHandlerFactory") { + URL.setURLStreamHandlerFactory(new SparkUrlStreamHandlerFactory()) + + // if 'hdfs' is not supported, MalformedURLException will be thrown + new URL("hdfs://docs.oracle.com/test.jar") + + var exceptionThrown: Boolean = false + try { + new URL("fffs://doesnotmatter") + } catch { + case e: MalformedURLException => exceptionThrown = true + } + assert(exceptionThrown === true) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index d06dbaa2d0abc..868fbace68fb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.internal +import java.net.URL import java.util.Locale import scala.reflect.ClassTag @@ -31,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ -import org.apache.spark.util.{MutableURLClassLoader, Utils} +import org.apache.spark.util.{MutableURLClassLoader, SparkUrlStreamHandlerFactory, Utils} /** @@ -149,6 +150,8 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { object SharedState { + URL.setURLStreamHandlerFactory(new SparkUrlStreamHandlerFactory()) + private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" private def externalCatalogClassName(conf: SparkConf): String = { From 08810f2865c222b50118ac4b9ca31e6d151d2c94 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Wed, 22 Mar 2017 12:57:57 -0700 Subject: [PATCH 2/5] Fix review comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 595f76e7e2fac..63d28c43130ab 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2784,7 +2784,7 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou /** * Factory for URL stream handlers. It relies on 'protocol' to choose the appropriate - * UrlStreamHandlerFactory to create URLStreamHandler. Adding new 'if' branches in + * UrlStreamHandlerFactory to create URLStreamHandler. Add new 'if' branches in * 'createURLStreamHandler' like 'hdfsHandler' to support more protocols. */ private[spark] class SparkUrlStreamHandlerFactory extends URLStreamHandlerFactory { @@ -2793,7 +2793,8 @@ private[spark] class SparkUrlStreamHandlerFactory extends URLStreamHandlerFactor def createURLStreamHandler(protocol: String): URLStreamHandler = { if (protocol.compareToIgnoreCase("hdfs") == 0) { if (hdfsHandler == null) { - hdfsHandler = new FsUrlStreamHandlerFactory().createURLStreamHandler(protocol) + hdfsHandler = new FsUrlStreamHandlerFactory(SparkHadoopUtil.get.conf) + .createURLStreamHandler(protocol) } hdfsHandler } else { From 47d36a10cdadf5b4c49db13a5851103b2edf24df Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 17 Apr 2017 21:45:20 -0700 Subject: [PATCH 3/5] Fix review comments --- .../scala/org/apache/spark/util/Utils.scala | 24 +------------------ .../org/apache/spark/util/UtilsSuite.scala | 17 +------------ .../spark/sql/internal/SharedState.scala | 6 ++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 16 +++++++++++++ 4 files changed, 21 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 63d28c43130ab..943dde0723271 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -47,7 +47,7 @@ import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, FsUrlStreamHandlerFactory, Path} +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException @@ -2780,25 +2780,3 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou new String(nonCircularBuffer, StandardCharsets.UTF_8) } } - - -/** - * Factory for URL stream handlers. It relies on 'protocol' to choose the appropriate - * UrlStreamHandlerFactory to create URLStreamHandler. Add new 'if' branches in - * 'createURLStreamHandler' like 'hdfsHandler' to support more protocols. - */ -private[spark] class SparkUrlStreamHandlerFactory extends URLStreamHandlerFactory { - private var hdfsHandler : URLStreamHandler = _ - - def createURLStreamHandler(protocol: String): URLStreamHandler = { - if (protocol.compareToIgnoreCase("hdfs") == 0) { - if (hdfsHandler == null) { - hdfsHandler = new FsUrlStreamHandlerFactory(SparkHadoopUtil.get.conf) - .createURLStreamHandler(protocol) - } - hdfsHandler - } else { - null - } - } -} diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 2a28267434987..8ed09749ffd54 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, FileOutputStream, PrintStream} import java.lang.{Double => JDouble, Float => JFloat} -import java.net._ +import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} import java.nio.charset.StandardCharsets import java.text.DecimalFormatSymbols @@ -1021,19 +1021,4 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { secretKeys.foreach { key => assert(redactedConf(key) === Utils.REDACTION_REPLACEMENT_TEXT) } assert(redactedConf("spark.regular.property") === "not_a_secret") } - - test("SparkUrlStreamHandlerFactory") { - URL.setURLStreamHandlerFactory(new SparkUrlStreamHandlerFactory()) - - // if 'hdfs' is not supported, MalformedURLException will be thrown - new URL("hdfs://docs.oracle.com/test.jar") - - var exceptionThrown: Boolean = false - try { - new URL("fffs://doesnotmatter") - } catch { - case e: MalformedURLException => exceptionThrown = true - } - assert(exceptionThrown === true) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 868fbace68fb2..5cb327e1d96da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -24,6 +24,7 @@ import scala.reflect.ClassTag import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FsUrlStreamHandlerFactory import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging @@ -32,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.StaticSQLConf._ -import org.apache.spark.util.{MutableURLClassLoader, SparkUrlStreamHandlerFactory, Utils} +import org.apache.spark.util.{MutableURLClassLoader, Utils} /** @@ -149,8 +150,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { } object SharedState { - - URL.setURLStreamHandlerFactory(new SparkUrlStreamHandlerFactory()) + URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0dd9296a3f0ff..bc7e41db940bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.io.File import java.math.MathContext +import java.net.{MalformedURLException, URL} import java.sql.Timestamp import java.util.concurrent.atomic.AtomicBoolean @@ -2606,4 +2607,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { case ae: AnalysisException => assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage) } } + + test("SPARK-12868: Allow adding jars from hdfs ") { + val jarFromHdfs = "hdfs://doesnotmatter/test.jar" + val jarFromInvalidFs = "fffs://doesnotmatter/test.jar" + + // if 'hdfs' is not supported, MalformedURLException will be thrown + new URL(jarFromHdfs) + var exceptionThrown: Boolean = false + try { + new URL(jarFromInvalidFs) + } catch { + case e: MalformedURLException => exceptionThrown = true + } + assert(exceptionThrown === true) + } } From 48069ccb17785bf4a406459d382b13e70b2e704e Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Wed, 19 Apr 2017 23:24:21 -0700 Subject: [PATCH 4/5] Fix review comments --- .../org/apache/spark/sql/internal/SharedState.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 5cb327e1d96da..893b3ef1fb033 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -149,8 +149,13 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { } } -object SharedState { - URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) +object SharedState extends Logging { + try { + URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) + } catch { + case e: Error => + logWarning("URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory") + } private val HIVE_EXTERNAL_CATALOG_CLASS_NAME = "org.apache.spark.sql.hive.HiveExternalCatalog" From fb1ee811e12f05c5d31880e6d88f306148612c18 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Mon, 24 Apr 2017 21:08:17 -0700 Subject: [PATCH 5/5] Fix review comments --- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bc7e41db940bf..3ecbf96b41961 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2614,12 +2614,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { // if 'hdfs' is not supported, MalformedURLException will be thrown new URL(jarFromHdfs) - var exceptionThrown: Boolean = false - try { + + intercept[MalformedURLException] { new URL(jarFromInvalidFs) - } catch { - case e: MalformedURLException => exceptionThrown = true } - assert(exceptionThrown === true) } }