From fa94a6c6f758ee6a52a9d1d300cadf6a6964193b Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 13 May 2020 23:18:38 -0700 Subject: [PATCH 1/3] [SPARK-31692][SQL] Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pass hadoop confs specifed via Spark confs to URLStreamHandlerfactory **BEFORE** ``` ➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem scala> spark.sharedState res0: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState5793cd84 scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream res1: java.io.InputStream = org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream22846025 scala> import org.apache.hadoop.fs._ import org.apache.hadoop.fs._ scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration) res2: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.LocalFileSystem5a930c03 ``` **AFTER** ``` ➜ spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem scala> spark.sharedState res0: org.apache.spark.sql.internal.SharedState = org.apache.spark.sql.internal.SharedState5c24a636 scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream res1: java.io.InputStream = org.apache.hadoop.fs.FSDataInputStream2ba8f528 scala> import org.apache.hadoop.fs._ import org.apache.hadoop.fs._ scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration) res2: org.apache.hadoop.fs.FileSystem = LocalFS scala> FileSystem.get(new Path("file:///tmp/1.txt").toUri, spark.sparkContext.hadoopConfiguration).getClass res3: Class[_ <: org.apache.hadoop.fs.FileSystem] = class org.apache.hadoop.fs.RawLocalFileSystem ``` The type of FileSystem object created(you can check the last statement in the above snippets) in the above two cases are different, which should not have been the case No Tested locally. Added Unit test Closes #28516 from karuppayya/SPARK-31692. Authored-by: Karuppayya Rajendran Signed-off-by: Dongjoon Hyun (cherry picked from commit 72601460ada41761737f39d5dff8e69444fce2ba) --- .../spark/sql/internal/SharedState.scala | 12 ++-- .../spark/sql/internal/SharedStateSuite.scala | 55 +++++++++++++++++++ 2 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 4d2be13c4841..0fe03cb1f6c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -22,13 +22,11 @@ import java.util.Locale import scala.reflect.ClassTag import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FsUrlStreamHandlerFactory - import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} @@ -157,7 +155,13 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { object SharedState extends Logging { try { - URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) + SparkSession.getActiveSession match { + case Some(spark) => + URL.setURLStreamHandlerFactory( + new FsUrlStreamHandlerFactory(spark.sparkContext.hadoopConfiguration)) + case _ => + URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) + } } catch { case e: Error => logWarning("URL.setURLStreamHandlerFactory failed to set FsUrlStreamHandlerFactory") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala new file mode 100644 index 000000000000..e4ccbf824968 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import java.net.URL + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FsUrlStreamHandlerFactory + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.sql.test.SharedSparkSession + + +/** + * Tests for [[org.apache.spark.sql.internal.SharedState]]. + */ +class SharedStateSuite extends SparkFunSuite with SharedSparkSession { + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.hadoop.fs.defaultFS", "file:///") + } + + test("SPARK-31692: Url handler factory should have the hadoop configs from Spark conf") { + // Accessing shared state' external catalog to init the code in Object SharedState + spark.sharedState.externalCatalog + val field = classOf[URL].getDeclaredField("factory") + field.setAccessible(true) + val value = field.get(null) + assert(value.isInstanceOf[FsUrlStreamHandlerFactory]) + val streamFactory = value.asInstanceOf[FsUrlStreamHandlerFactory] + + val confField = classOf[FsUrlStreamHandlerFactory].getDeclaredField("conf") + confField.setAccessible(true) + val conf = confField.get(streamFactory) + + assert(conf.isInstanceOf[Configuration]) + assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///") + } +} From e7b378f12a7bf0ad6fc35f3bc58d83856b824d07 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Thu, 14 May 2020 10:09:23 -0700 Subject: [PATCH 2/3] Fix: Add back whitelines in imports --- .../scala/org/apache/spark/sql/internal/SharedState.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 0fe03cb1f6c7..d01c82f37b2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -22,11 +22,13 @@ import java.util.Locale import scala.reflect.ClassTag import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FsUrlStreamHandlerFactory + import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab} From c96d6b0e4bd2effbf8ddc2f2d7acd0890dae168a Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Thu, 14 May 2020 10:41:46 -0700 Subject: [PATCH 3/3] Fix: Fix stylechecks --- .../main/scala/org/apache/spark/sql/internal/SharedState.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index d01c82f37b2a..ec82c06fd323 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -159,8 +159,10 @@ object SharedState extends Logging { try { SparkSession.getActiveSession match { case Some(spark) => + // scalastyle:off hadoopconfiguration URL.setURLStreamHandlerFactory( new FsUrlStreamHandlerFactory(spark.sparkContext.hadoopConfiguration)) + // scalastyle:on hadoopconfiguration case _ => URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()) }