From c3de557ee383f3bb96ab5401db146c4cf2a13124 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Sep 2019 17:44:59 +0800 Subject: [PATCH 01/15] save change --- .../hive/execution/HiveTableScanExec.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5b00e2ebafa43..1b0e5a0c0bb29 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,8 +20,11 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.common.JavaUtils +import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -120,6 +123,36 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) + logInfo(s"Test ADD JAR ${SessionState.get()}") + if (SessionState.get() != null) { + logInfo(s"Test ADD JAR ${SessionState.get().getConf.getClassLoader}") + logInfo("Test ADD JAR with SessionState.getConf.getClassLoader") + // scalastyle:off + try { + Class.forName(tableDesc.getSerdeClassName(), true, SessionState.get().getConf.getClassLoader) + } catch { + case e:Exception => + logInfo("Failed Test ADD JAR with SessionState.getConf.getClassLoader") + } + } + + logInfo(s"JavaUtils.getClassLoader => ${JavaUtils.getClassLoader}") + logInfo(s"SessionState.SharedState.jarClssLoader => ${sparkSession.sharedState.jarClassLoader}") + logInfo("Test ADD JAR with sharedState's JarClassloader") + // scalastyle:off + try { + Class.forName(tableDesc.getSerdeClassName(), true, sparkSession.sharedState.jarClassLoader) + } catch { + case e: Exception => + logInfo("Failed Test ADD JAR with sharedState's JarClassloader") + } + logInfo("Test ADD JAR with JavaUtils.getClassLoader") + try { + Class.forName(tableDesc.getSerdeClassName(), true, JavaUtils.getClassLoader) + } catch { + case e: Exception => + logInfo("Failed Test ADD JAR with JavaUtils.getClassLoader") + } val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) From 2cf3153f046a25e38173fad0cddc1c581ed42b93 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Sep 2019 19:05:07 +0800 Subject: [PATCH 02/15] Revert "save change" This reverts commit c3de557ee383f3bb96ab5401db146c4cf2a13124. --- .../hive/execution/HiveTableScanExec.scala | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 1b0e5a0c0bb29..5b00e2ebafa43 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,11 +20,8 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.common.JavaUtils -import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -123,36 +120,6 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) - logInfo(s"Test ADD JAR ${SessionState.get()}") - if (SessionState.get() != null) { - logInfo(s"Test ADD JAR ${SessionState.get().getConf.getClassLoader}") - logInfo("Test ADD JAR with SessionState.getConf.getClassLoader") - // scalastyle:off - try { - Class.forName(tableDesc.getSerdeClassName(), true, SessionState.get().getConf.getClassLoader) - } catch { - case e:Exception => - logInfo("Failed Test ADD JAR with SessionState.getConf.getClassLoader") - } - } - - logInfo(s"JavaUtils.getClassLoader => ${JavaUtils.getClassLoader}") - logInfo(s"SessionState.SharedState.jarClssLoader => ${sparkSession.sharedState.jarClassLoader}") - logInfo("Test ADD JAR with sharedState's JarClassloader") - // scalastyle:off - try { - Class.forName(tableDesc.getSerdeClassName(), true, sparkSession.sharedState.jarClassLoader) - } catch { - case e: Exception => - logInfo("Failed Test ADD JAR with sharedState's JarClassloader") - } - logInfo("Test ADD JAR with JavaUtils.getClassLoader") - try { - Class.forName(tableDesc.getSerdeClassName(), true, JavaUtils.getClassLoader) - } catch { - case e: Exception => - logInfo("Failed Test ADD JAR with JavaUtils.getClassLoader") - } val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) From 6dc61e75fa4fd8c93be0c9b0ef540af1b9ab689a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 18 Sep 2019 15:10:02 +0800 Subject: [PATCH 03/15] TEST-SPARK-29015 --- .../sql/hive/thriftserver/CliSuite.scala | 55 +++++++++++++++++++ .../sql/hive/client/HiveClientImpl.scala | 7 +++ 2 files changed, 62 insertions(+) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 6d45041e12821..5af6b3669b916 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -227,6 +227,34 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) } + test("Commands using SerDe provided jars in conf hive.aux.jars.path") { + + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath + + runCliWithin( + 3.minute, + Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( + """CREATE TABLE addJarWithHiveAux(key string, val string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; + """.stripMargin + -> "", + "CREATE TABLE sourceTableForWithHiveAux (key INT, val STRING);" + -> "", + s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithHiveAux;" + -> "", + "INSERT INTO TABLE addJarWithHiveAux SELECT key, val FROM sourceTableForWithHiveAux;" + -> "", + "SELECT collect_list(array(val)) FROM addJarWithHiveAux;" + -> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""", + "DROP TABLE addJarWithHiveAux;" + -> "", + "DROP TABLE sourceTableForWithHiveAux;" + -> "" + ) + } + test("SPARK-11188 Analysis error reporting") { runCliWithin(timeout = 2.minute, errorResponses = Seq("AnalysisException"))( @@ -332,4 +360,31 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SELECT concat_ws(',', 'First', example_max(1234321), 'Third');" -> "First,1234321,Third" ) } + + test("SPARK-29022 Commands using SerDe provided in ADD JAR sql") { + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath + + runCliWithin( + 3.minute)( + s"ADD JAR ${hiveContribJar};" -> "", + """CREATE TABLE addJarWithSQL(key string, val string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; + """.stripMargin + -> "", + "CREATE TABLE sourceTableForWithSQL(key INT, val STRING);" + -> "", + s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithSQL;" + -> "", + "INSERT INTO TABLE addJarWithSQL SELECT key, val FROM sourceTableForWithSQL;" + -> "", + "SELECT collect_list(array(val)) FROM addJarWithSQL;" + -> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""", + "DROP TABLE addJarWithSQL;" + -> "", + "DROP TABLE sourceTableForWithSQL;" + -> "" + ) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 96e61bd542806..e3a8afa0f590e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -145,6 +145,13 @@ private[hive] class HiveClientImpl( warehouseDir.foreach { dir => ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir) } + // ret != null means we have a CliSessionState instance in current thread which initialized + // by SparkSQLCLIDriver. The class loader of CliSessionState's conf is current main thread's + // class loader used to load jars passed by --jars. One class loader used by AddJarCommand + // is clientLoader.classLoader which contain jar path passed by --jars in main thread. + // We set CliSessionState's conf class loader to clientLoader.classLoader. Thus we can load + // all jars passed by --jars and AddJarCommand. + ret.getConf.setClassLoader(clientLoader.classLoader) ret } else { newState() From 0fb149c25e995bd5d2f08ece1ac65ef5f587d722 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 9 Sep 2019 15:22:16 +0800 Subject: [PATCH 04/15] SAVE METHOD will change after SPARK-28840 --- .../apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 9f554b200f775..df0fab9380b30 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -155,6 +155,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { val cli = new SparkSQLCLIDriver cli.setHiveVariables(oproc.getHiveVariables) + SessionState.get().getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader) // In SparkSQL CLI, we may want to use jars augmented by hiveconf // hive.aux.jars.path, here we add jars augmented by hiveconf to From 4047a4be721fcc6fd1ddc53dd966fab396032325 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 13 Sep 2019 00:07:54 +0800 Subject: [PATCH 05/15] fix conflict --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index df0fab9380b30..a8e8d04484cf0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -155,6 +155,16 @@ private[hive] object SparkSQLCLIDriver extends Logging { val cli = new SparkSQLCLIDriver cli.setHiveVariables(oproc.getHiveVariables) + + // In SparkSQL CLI, we may want to use jars augmented by hiveconf + // hive.aux.jars.path, here we add jars augmented by hiveconf to + // Spark's SessionResourceLoader to obtain these jars. + val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS) + if (StringUtils.isNotBlank(auxJars)) { + val resourceLoader = SparkSQLEnv.sqlContext.sessionState.resourceLoader + StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_)) + } + SessionState.get().getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader) // In SparkSQL CLI, we may want to use jars augmented by hiveconf From bfd0579deda9ea104c801cc7330c7c5ad145a1a1 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 16 Sep 2019 23:37:17 +0800 Subject: [PATCH 06/15] add UT for use HIVEAUXJARS's class as Serde --- .../sql/hive/thriftserver/CliSuite.scala | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 5af6b3669b916..83cad24758e17 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -255,6 +255,34 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) } + test("Commands using SerDe provided in --hive.aux.jars.path") { + + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath + + runCliWithin( + 3.minute, + Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( + """CREATE TABLE t1(key string, val string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; + """.stripMargin + -> "", + "CREATE TABLE sourceTable (key INT, val STRING);" + -> "", + s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;" + -> "", + "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;" + -> "", + "SELECT collect_list(array(val)) FROM t1;" + -> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""", + "DROP TABLE t1;" + -> "", + "DROP TABLE sourceTable;" + -> "" + ) + } + test("SPARK-11188 Analysis error reporting") { runCliWithin(timeout = 2.minute, errorResponses = Seq("AnalysisException"))( From b729955da5d13c0698a53c8fa72aa285575bb3ea Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 17 Sep 2019 07:22:58 +0800 Subject: [PATCH 07/15] =?UTF-8?q?fix=20when=20construct=20HiveClientiImpl?= =?UTF-8?q?=E2=80=98s=20state?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hive/thriftserver/SparkSQLCLIDriver.scala | 11 ------- .../sql/hive/thriftserver/CliSuite.scala | 29 ------------------- 2 files changed, 40 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index a8e8d04484cf0..9f554b200f775 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -165,17 +165,6 @@ private[hive] object SparkSQLCLIDriver extends Logging { StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_)) } - SessionState.get().getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader) - - // In SparkSQL CLI, we may want to use jars augmented by hiveconf - // hive.aux.jars.path, here we add jars augmented by hiveconf to - // Spark's SessionResourceLoader to obtain these jars. - val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS) - if (StringUtils.isNotBlank(auxJars)) { - val resourceLoader = SparkSQLEnv.sqlContext.sessionState.resourceLoader - StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_)) - } - // TODO work around for set the log output to console, because the HiveContext // will set the output into an invalid buffer. sessionState.in = System.in diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 83cad24758e17..567b3dc605054 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -227,36 +227,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) } - test("Commands using SerDe provided jars in conf hive.aux.jars.path") { - - val dataFilePath = - Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath - - runCliWithin( - 3.minute, - Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( - """CREATE TABLE addJarWithHiveAux(key string, val string) - |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; - """.stripMargin - -> "", - "CREATE TABLE sourceTableForWithHiveAux (key INT, val STRING);" - -> "", - s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithHiveAux;" - -> "", - "INSERT INTO TABLE addJarWithHiveAux SELECT key, val FROM sourceTableForWithHiveAux;" - -> "", - "SELECT collect_list(array(val)) FROM addJarWithHiveAux;" - -> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""", - "DROP TABLE addJarWithHiveAux;" - -> "", - "DROP TABLE sourceTableForWithHiveAux;" - -> "" - ) - } - test("Commands using SerDe provided in --hive.aux.jars.path") { - val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath From 88169fccbc9af28f7400d324b15a4f1b1a244a4d Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 17 Sep 2019 09:36:44 +0800 Subject: [PATCH 08/15] change UT table name --- .../spark/sql/hive/thriftserver/CliSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 567b3dc605054..4103fff8d865b 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -235,21 +235,21 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { runCliWithin( 3.minute, Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( - """CREATE TABLE t1(key string, val string) + """CREATE TABLE addJarWithHiveAux(key string, val string) |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; """.stripMargin -> "", - "CREATE TABLE sourceTable (key INT, val STRING);" + "CREATE TABLE sourceTableForWithHiveAux (key INT, val STRING);" -> "", - s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;" + s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithHiveAux;" -> "", - "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;" + "INSERT INTO TABLE addJarWithHiveAux SELECT key, val FROM sourceTableForWithHiveAux;" -> "", - "SELECT collect_list(array(val)) FROM t1;" + "SELECT collect_list(array(val)) FROM addJarWithHiveAux;" -> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""", - "DROP TABLE t1;" + "DROP TABLE addJarWithHiveAux;" -> "", - "DROP TABLE sourceTable;" + "DROP TABLE sourceTableForWithHiveAux;" -> "" ) } From 4a18ceb1ff75d94888a1e8e603566407c2bd9bf2 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 18 Sep 2019 17:19:34 +0800 Subject: [PATCH 09/15] remove empty lines --- .../scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 4103fff8d865b..a628bc8134dc7 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -231,7 +231,6 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath - runCliWithin( 3.minute, Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( @@ -364,7 +363,6 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath - runCliWithin( 3.minute)( s"ADD JAR ${hiveContribJar};" -> "", From 7692893d8ca1474c04407cd412dce416f0709af6 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 29 Sep 2019 10:44:19 +0800 Subject: [PATCH 10/15] change UT title --- .../scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index a628bc8134dc7..13909d86946ca 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -227,7 +227,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) } - test("Commands using SerDe provided in --hive.aux.jars.path") { + test("SPARK-29022: Commands using SerDe provided in --hive.aux.jars.path") { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath From c108ecacc74e80c9597777873b7be4dc2fc139d7 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 29 Sep 2019 12:07:03 +0800 Subject: [PATCH 11/15] merge master --- .../sql/hive/thriftserver/CliSuite.scala | 52 +++++++++++++++++++ .../sql/hive/client/HiveClientImpl.scala | 7 +++ 2 files changed, 59 insertions(+) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index db33d1d4a07dd..a8fef18f05ebf 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -226,6 +226,32 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) } + test("SPARK-29022: Commands using SerDe provided in --hive.aux.jars.path") { + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath + runCliWithin( + 3.minute, + Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( + """CREATE TABLE addJarWithHiveAux(key string, val string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; + """.stripMargin + -> "", + "CREATE TABLE sourceTableForWithHiveAux (key INT, val STRING);" + -> "", + s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithHiveAux;" + -> "", + "INSERT INTO TABLE addJarWithHiveAux SELECT key, val FROM sourceTableForWithHiveAux;" + -> "", + "SELECT collect_list(array(val)) FROM addJarWithHiveAux;" + -> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""", + "DROP TABLE addJarWithHiveAux;" + -> "", + "DROP TABLE sourceTableForWithHiveAux;" + -> "" + ) + } + test("SPARK-11188 Analysis error reporting") { runCliWithin(timeout = 2.minute, errorResponses = Seq("AnalysisException"))( @@ -332,4 +358,30 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { "SELECT concat_ws(',', 'First', example_max(1234321), 'Third');" -> "First,1234321,Third" ) } + + test("SPARK-29022 Commands using SerDe provided in ADD JAR sql") { + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") + val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath + runCliWithin( + 3.minute)( + s"ADD JAR ${hiveContribJar};" -> "", + """CREATE TABLE addJarWithSQL(key string, val string) + |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; + """.stripMargin + -> "", + "CREATE TABLE sourceTableForWithSQL(key INT, val STRING);" + -> "", + s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTableForWithSQL;" + -> "", + "INSERT INTO TABLE addJarWithSQL SELECT key, val FROM sourceTableForWithSQL;" + -> "", + "SELECT collect_list(array(val)) FROM addJarWithSQL;" + -> """[["val_238"],["val_86"],["val_311"],["val_27"],["val_165"]]""", + "DROP TABLE addJarWithSQL;" + -> "", + "DROP TABLE sourceTableForWithSQL;" + -> "" + ) + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 66afe4c82338d..157e9c59ea717 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -145,6 +145,13 @@ private[hive] class HiveClientImpl( warehouseDir.foreach { dir => ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir) } + // ret != null means we have a CliSessionState instance in current thread which initialized + // by SparkSQLCLIDriver. The class loader of CliSessionState's conf is current main thread's + // class loader used to load jars passed by --jars. One class loader used by AddJarCommand + // is clientLoader.classLoader which contain jar path passed by --jars in main thread. + // We set CliSessionState's conf class loader to clientLoader.classLoader. Thus we can load + // all jars passed by --jars and AddJarCommand. + ret.getConf.setClassLoader(clientLoader.classLoader) ret } else { newState() From 8633fafb747bf39b860fba96cf69851904644c11 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 29 Sep 2019 12:15:23 +0800 Subject: [PATCH 12/15] Use new method --- .../org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index e8ee1d4b0feb3..a8fef18f05ebf 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -229,7 +229,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { test("SPARK-29022: Commands using SerDe provided in --hive.aux.jars.path") { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath + val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath runCliWithin( 3.minute, Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( @@ -362,7 +362,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { test("SPARK-29022 Commands using SerDe provided in ADD JAR sql") { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - val hiveContribJar = HiveTestUtils.getHiveContribJar.getCanonicalPath + val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath runCliWithin( 3.minute)( s"ADD JAR ${hiveContribJar};" -> "", From db63cf2274f6f2c32913f84fb2a08c79425fc8dc Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 29 Sep 2019 19:22:26 +0800 Subject: [PATCH 13/15] use right jar --- .../org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index a8fef18f05ebf..f3063675a79f7 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -229,7 +229,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { test("SPARK-29022: Commands using SerDe provided in --hive.aux.jars.path") { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath + val hiveContribJar = HiveTestJars.getHiveHcatalogCoreJar().getCanonicalPath runCliWithin( 3.minute, Seq("--conf", s"spark.hadoop.${ConfVars.HIVEAUXJARS}=$hiveContribJar"))( @@ -362,7 +362,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { test("SPARK-29022 Commands using SerDe provided in ADD JAR sql") { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") - val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath + val hiveContribJar = HiveTestJars.getHiveHcatalogCoreJar().getCanonicalPath runCliWithin( 3.minute)( s"ADD JAR ${hiveContribJar};" -> "", From 918941ed4862740d7f6710aae77c7fd4018c42d1 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 29 Sep 2019 21:49:19 +0800 Subject: [PATCH 14/15] Only reset classloader when cli mode --- .../sql/hive/client/HiveClientImpl.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 157e9c59ea717..1bb4811f8ddbe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -145,13 +145,20 @@ private[hive] class HiveClientImpl( warehouseDir.foreach { dir => ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir) } - // ret != null means we have a CliSessionState instance in current thread which initialized - // by SparkSQLCLIDriver. The class loader of CliSessionState's conf is current main thread's - // class loader used to load jars passed by --jars. One class loader used by AddJarCommand - // is clientLoader.classLoader which contain jar path passed by --jars in main thread. - // We set CliSessionState's conf class loader to clientLoader.classLoader. Thus we can load - // all jars passed by --jars and AddJarCommand. - ret.getConf.setClassLoader(clientLoader.classLoader) + // Since in jdk11, when HiveClient's withHiveSate, it will set ThreadLocal SessionState, + // In HiveThriftServer2, it will call HiveUtils.newClientForExecution() to get a client + // for Execution, then that method will trigger here to execute, in that case we can't reset + // ret.getConf's ClassLoader. + if (HiveUtils.isCliSessionState) { + // ret != null means we have a CliSessionState instance in current thread which + // initialized by SparkSQLCLIDriver. The class loader of CliSessionState's conf is + // current main thread's class loader used to load jars passed by --jars. + // One class loader used by AddJarCommand is clientLoader.classLoader which contain + // jar path passed by --jars in main thread. We set CliSessionState's conf class loader + // to clientLoader.classLoader. + // Thus we can load all jars passed by --jars and AddJarCommand. + ret.getConf.setClassLoader(clientLoader.classLoader) + } ret } else { newState() From 72d4d84cbd60829c6856075674de12ee59abd7d8 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 30 Sep 2019 09:42:47 +0800 Subject: [PATCH 15/15] fix in SparkSQLCliDriver --- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 7 +++++++ .../spark/sql/hive/client/HiveClientImpl.scala | 14 -------------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 9f554b200f775..323b851774abc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -165,6 +165,13 @@ private[hive] object SparkSQLCLIDriver extends Logging { StringUtils.split(auxJars, ",").foreach(resourceLoader.addJar(_)) } + // The class loader of CliSessionState's conf is current main thread's class loader + // used to load jars passed by --jars. One class loader used by AddJarCommand is + // sharedState.jarClassLoader which contain jar path passed by --jars in main thread. + // We set CliSessionState's conf class loader to sharedState.jarClassLoader. + // Thus we can load all jars passed by --jars and AddJarCommand. + sessionState.getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader) + // TODO work around for set the log output to console, because the HiveContext // will set the output into an invalid buffer. sessionState.in = System.in diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 1bb4811f8ddbe..66afe4c82338d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -145,20 +145,6 @@ private[hive] class HiveClientImpl( warehouseDir.foreach { dir => ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir) } - // Since in jdk11, when HiveClient's withHiveSate, it will set ThreadLocal SessionState, - // In HiveThriftServer2, it will call HiveUtils.newClientForExecution() to get a client - // for Execution, then that method will trigger here to execute, in that case we can't reset - // ret.getConf's ClassLoader. - if (HiveUtils.isCliSessionState) { - // ret != null means we have a CliSessionState instance in current thread which - // initialized by SparkSQLCLIDriver. The class loader of CliSessionState's conf is - // current main thread's class loader used to load jars passed by --jars. - // One class loader used by AddJarCommand is clientLoader.classLoader which contain - // jar path passed by --jars in main thread. We set CliSessionState's conf class loader - // to clientLoader.classLoader. - // Thus we can load all jars passed by --jars and AddJarCommand. - ret.getConf.setClassLoader(clientLoader.classLoader) - } ret } else { newState()