From c3de557ee383f3bb96ab5401db146c4cf2a13124 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Sep 2019 17:44:59 +0800 Subject: [PATCH 01/22] save change --- .../hive/execution/HiveTableScanExec.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5b00e2ebafa43..1b0e5a0c0bb29 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,8 +20,11 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.common.JavaUtils +import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -120,6 +123,36 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) + logInfo(s"Test ADD JAR ${SessionState.get()}") + if (SessionState.get() != null) { + logInfo(s"Test ADD JAR ${SessionState.get().getConf.getClassLoader}") + logInfo("Test ADD JAR with SessionState.getConf.getClassLoader") + // scalastyle:off + try { + Class.forName(tableDesc.getSerdeClassName(), true, SessionState.get().getConf.getClassLoader) + } catch { + case e:Exception => + logInfo("Failed Test ADD JAR with SessionState.getConf.getClassLoader") + } + } + + logInfo(s"JavaUtils.getClassLoader => ${JavaUtils.getClassLoader}") + logInfo(s"SessionState.SharedState.jarClssLoader => ${sparkSession.sharedState.jarClassLoader}") + logInfo("Test ADD JAR with sharedState's JarClassloader") + // scalastyle:off + try { + Class.forName(tableDesc.getSerdeClassName(), true, sparkSession.sharedState.jarClassLoader) + } catch { + case e: Exception => + logInfo("Failed Test ADD JAR with sharedState's JarClassloader") + } + logInfo("Test ADD JAR with JavaUtils.getClassLoader") + try { + Class.forName(tableDesc.getSerdeClassName(), true, JavaUtils.getClassLoader) + } catch { + case e: Exception => + logInfo("Failed Test ADD JAR with JavaUtils.getClassLoader") + } val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) From 2cf3153f046a25e38173fad0cddc1c581ed42b93 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 10 Sep 2019 19:05:07 +0800 Subject: [PATCH 02/22] Revert "save change" This reverts commit c3de557ee383f3bb96ab5401db146c4cf2a13124. --- .../hive/execution/HiveTableScanExec.scala | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 1b0e5a0c0bb29..5b00e2ebafa43 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -20,11 +20,8 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.common.JavaUtils -import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -123,36 +120,6 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) - logInfo(s"Test ADD JAR ${SessionState.get()}") - if (SessionState.get() != null) { - logInfo(s"Test ADD JAR ${SessionState.get().getConf.getClassLoader}") - logInfo("Test ADD JAR with SessionState.getConf.getClassLoader") - // scalastyle:off - try { - Class.forName(tableDesc.getSerdeClassName(), true, SessionState.get().getConf.getClassLoader) - } catch { - case e:Exception => - logInfo("Failed Test ADD JAR with SessionState.getConf.getClassLoader") - } - } - - logInfo(s"JavaUtils.getClassLoader => ${JavaUtils.getClassLoader}") - logInfo(s"SessionState.SharedState.jarClssLoader => ${sparkSession.sharedState.jarClassLoader}") - logInfo("Test ADD JAR with sharedState's JarClassloader") - // scalastyle:off - try { - Class.forName(tableDesc.getSerdeClassName(), true, sparkSession.sharedState.jarClassLoader) - } catch { - case e: Exception => - logInfo("Failed Test ADD JAR with sharedState's JarClassloader") - } - logInfo("Test ADD JAR with JavaUtils.getClassLoader") - try { - Class.forName(tableDesc.getSerdeClassName(), true, JavaUtils.getClassLoader) - } catch { - case e: Exception => - logInfo("Failed Test ADD JAR with JavaUtils.getClassLoader") - } val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) From edaf4022040234cb4e1cfbd5b73f0856cab0ae15 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 12 Sep 2019 23:35:26 +0800 Subject: [PATCH 03/22] fix bug of jdk11 --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5b2eeb2cf34c0..c2f707f338006 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -311,6 +311,7 @@ private[hive] class HiveClientImpl( val ret = try f finally { state.getConf.setClassLoader(originalConfLoader) Thread.currentThread().setContextClassLoader(original) + SessionState.detachSession() HiveCatalogMetrics.incrementHiveClientCalls(1) } ret From 7ec01e48deab55b8ee7cc7701d16d43b8d55a96f Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 13 Sep 2019 02:10:11 +0800 Subject: [PATCH 04/22] Revert "fix bug of jdk11" This reverts commit edaf4022040234cb4e1cfbd5b73f0856cab0ae15. --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c2f707f338006..5b2eeb2cf34c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -311,7 +311,6 @@ private[hive] class HiveClientImpl( val ret = try f finally { state.getConf.setClassLoader(originalConfLoader) Thread.currentThread().setContextClassLoader(original) - SessionState.detachSession() HiveCatalogMetrics.incrementHiveClientCalls(1) } ret From 098a4321f17874bd00cbc63a328ef0e612b32c80 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 13 Sep 2019 02:13:06 +0800 Subject: [PATCH 05/22] fix only this point --- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5b00e2ebafa43..2a64ea4ceb091 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -87,6 +87,8 @@ case class HiveTableScanExec( BindReferences.bindReference(pred, relation.partitionCols) } + @transient private lazy val hiveClient: HiveClient = sparkSession.sharedState.externalCatalog + .unwrapped.asInstanceOf[HiveExternalCatalog].client @transient private lazy val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta) @transient private lazy val tableDesc = new TableDesc( hiveQlTable.getInputFormatClass, @@ -95,7 +97,7 @@ case class HiveTableScanExec( // Create a local copy of hadoopConf,so that scan specific modifications should not impact // other queries - @transient private lazy val hadoopConf = { + @transient private lazy val hadoopConf = hiveClient.withHiveState { val c = sparkSession.sessionState.newHadoopConf() // append columns ids and names before broadcast addColumnMetadataToConf(c) From 07dd32ea8c01be2bb997f18757c43988df9ea408 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 13 Sep 2019 08:21:59 +0800 Subject: [PATCH 06/22] still detach SessionState --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 ++++ .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 4 +--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5b2eeb2cf34c0..423970f343381 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -294,6 +294,7 @@ private[hive] class HiveClientImpl( def withHiveState[A](f: => A): A = retryLocked { val original = Thread.currentThread().getContextClassLoader val originalConfLoader = state.getConf.getClassLoader + val originState = SessionState.get() // The classloader in clientLoader could be changed after addJar, always use the latest // classloader. We explicitly set the context class loader since "conf.setClassLoader" does // not do that, and the Hive client libraries may need to load classes defined by the client's @@ -311,6 +312,9 @@ private[hive] class HiveClientImpl( val ret = try f finally { state.getConf.setClassLoader(originalConfLoader) Thread.currentThread().setContextClassLoader(original) + if (originState != null) { + SessionState.setCurrentSessionState(originState) + } HiveCatalogMetrics.incrementHiveClientCalls(1) } ret diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 2a64ea4ceb091..9203cf5645cbd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -87,8 +87,6 @@ case class HiveTableScanExec( BindReferences.bindReference(pred, relation.partitionCols) } - @transient private lazy val hiveClient: HiveClient = sparkSession.sharedState.externalCatalog - .unwrapped.asInstanceOf[HiveExternalCatalog].client @transient private lazy val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta) @transient private lazy val tableDesc = new TableDesc( hiveQlTable.getInputFormatClass, @@ -97,7 +95,7 @@ case class HiveTableScanExec( // Create a local copy of hadoopConf,so that scan specific modifications should not impact // other queries - @transient private lazy val hadoopConf = hiveClient.withHiveState { + @transient private lazy val hadoopConf = { val c = sparkSession.sessionState.newHadoopConf() // append columns ids and names before broadcast addColumnMetadataToConf(c) From 35f0a015f42f22cb3f3f5562349118a1c0421883 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 13 Sep 2019 09:07:13 +0800 Subject: [PATCH 07/22] remove unused import --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 ++ .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 423970f343381..02ba0feaf6452 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -314,6 +314,8 @@ private[hive] class HiveClientImpl( Thread.currentThread().setContextClassLoader(original) if (originState != null) { SessionState.setCurrentSessionState(originState) + } else { + SessionState.detachSession() } HiveCatalogMetrics.incrementHiveClientCalls(1) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 9203cf5645cbd..5b00e2ebafa43 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl} +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils From fb39a53e315db707383db5aa578de9cc92254c88 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 13 Sep 2019 11:33:38 +0800 Subject: [PATCH 08/22] fix in HiveTableScanExec --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 6 ------ .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 5 +++++ 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 02ba0feaf6452..5b2eeb2cf34c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -294,7 +294,6 @@ private[hive] class HiveClientImpl( def withHiveState[A](f: => A): A = retryLocked { val original = Thread.currentThread().getContextClassLoader val originalConfLoader = state.getConf.getClassLoader - val originState = SessionState.get() // The classloader in clientLoader could be changed after addJar, always use the latest // classloader. We explicitly set the context class loader since "conf.setClassLoader" does // not do that, and the Hive client libraries may need to load classes defined by the client's @@ -312,11 +311,6 @@ private[hive] class HiveClientImpl( val ret = try f finally { state.getConf.setClassLoader(originalConfLoader) Thread.currentThread().setContextClassLoader(original) - if (originState != null) { - SessionState.setCurrentSessionState(originState) - } else { - SessionState.detachSession() - } HiveCatalogMetrics.incrementHiveClientCalls(1) } ret diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5b00e2ebafa43..8cd2579bb8098 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -120,6 +121,10 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) + val currentState = SessionState.get() + if (currentState != null) { + currentState.getConf.setClassLoader(Thread.currentThread().getContextClassLoader) + } val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) From a510a23f9ebd9090ed0184d37048bb79ea17830a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 14 Sep 2019 15:22:14 +0800 Subject: [PATCH 09/22] reset origin classloader --- .../spark/sql/hive/execution/HiveTableScanExec.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 8cd2579bb8098..4b74bb1207ae4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession @@ -122,12 +123,18 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) val currentState = SessionState.get() - if (currentState != null) { + val deserializer = if (currentState != null) { + val originClassLoader = currentState.getConf.getClassLoader currentState.getConf.setClassLoader(Thread.currentThread().getContextClassLoader) + val instance = tableDesc.getDeserializerClass.getConstructor().newInstance() + currentState.getConf.setClassLoader(originClassLoader) + instance + } else { + tableDesc.getDeserializerClass.getConstructor().newInstance() } - val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) + // Specifies types and object inspectors of columns to be scanned. val structOI = ObjectInspectorUtils .getStandardObjectInspector( From bc736630a118907a72a3e8d59f109fbc90105995 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 14 Sep 2019 15:34:54 +0800 Subject: [PATCH 10/22] fix scala style --- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 4b74bb1207ae4..b21d4d088245b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession From f772e492879d7d8706d66f31709846ed40e3fc5f Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 15 Sep 2019 16:49:52 +0800 Subject: [PATCH 11/22] use sharedstate's classloader --- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index b21d4d088245b..b3ecb1dae047d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -124,7 +124,7 @@ case class HiveTableScanExec( val currentState = SessionState.get() val deserializer = if (currentState != null) { val originClassLoader = currentState.getConf.getClassLoader - currentState.getConf.setClassLoader(Thread.currentThread().getContextClassLoader) + currentState.getConf.setClassLoader(sparkSession.sharedState.jarClassLoader) val instance = tableDesc.getDeserializerClass.getConstructor().newInstance() currentState.getConf.setClassLoader(originClassLoader) instance From c213b07419bf475df21ba458a41ac1070db84177 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 15 Sep 2019 18:28:37 +0800 Subject: [PATCH 12/22] fix null SessionState --- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index b3ecb1dae047d..a2123c11df923 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -129,7 +129,11 @@ case class HiveTableScanExec( currentState.getConf.setClassLoader(originClassLoader) instance } else { - tableDesc.getDeserializerClass.getConstructor().newInstance() + val originClassLoader = Thread.currentThread().getContextClassLoader + Thread.currentThread().setContextClassLoader(sparkSession.sharedState.jarClassLoader) + val instance = tableDesc.getDeserializerClass.getConstructor().newInstance() + Thread.currentThread().setContextClassLoader(originClassLoader) + instance } deserializer.initialize(hiveConf, tableDesc.getProperties) From c751ce6e221f133a85e434e9c34746ae950b0599 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 15 Sep 2019 18:28:43 +0800 Subject: [PATCH 13/22] save code --- .../apache/spark/sql/hive/TableReader.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 3f9925e73705e..d64713bdbc51e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -91,11 +91,30 @@ class HadoopTableReader( override def conf: SQLConf = sparkSession.sessionState.conf - override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = + override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = { + logInfo(s"${sparkSession.sharedState.jarClassLoader}") + logInfo(s"${Thread.currentThread().getContextClassLoader}") + logInfo(s"${Utils.getSparkClassLoader}") + + var x: RDD[InternalRow] = null + val origin = Thread.currentThread().getContextClassLoader + try { + logInfo("Use sharedState.jarClassLoader") + Thread.currentThread().setContextClassLoader(sparkSession.sharedState.jarClassLoader) + makeRDDForTable( + hiveTable, + Utils.classForName[Deserializer](tableDesc.getSerdeClassName), + filterOpt = None) + } catch { + case e: Exception => logError("Failed with sharedState.jarClassLoader", e) + } + Thread.currentThread().setContextClassLoader(origin) makeRDDForTable( hiveTable, Utils.classForName[Deserializer](tableDesc.getSerdeClassName), filterOpt = None) + } + /** * Creates a Hadoop RDD to read data from the target table's data directory. Returns a transformed From 6e70b744a92f48cefdf0c009f750ae744f31b91e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 15 Sep 2019 19:50:59 +0800 Subject: [PATCH 14/22] add classloader when doexecute in HiveTableScanExec --- .../org/apache/spark/sql/hive/TableReader.scala | 17 ----------------- .../sql/hive/execution/HiveTableScanExec.scala | 7 ++----- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index d64713bdbc51e..d8ba7b17b8522 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -92,23 +92,6 @@ class HadoopTableReader( override def conf: SQLConf = sparkSession.sessionState.conf override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = { - logInfo(s"${sparkSession.sharedState.jarClassLoader}") - logInfo(s"${Thread.currentThread().getContextClassLoader}") - logInfo(s"${Utils.getSparkClassLoader}") - - var x: RDD[InternalRow] = null - val origin = Thread.currentThread().getContextClassLoader - try { - logInfo("Use sharedState.jarClassLoader") - Thread.currentThread().setContextClassLoader(sparkSession.sharedState.jarClassLoader) - makeRDDForTable( - hiveTable, - Utils.classForName[Deserializer](tableDesc.getSerdeClassName), - filterOpt = None) - } catch { - case e: Exception => logError("Failed with sharedState.jarClassLoader", e) - } - Thread.currentThread().setContextClassLoader(origin) makeRDDForTable( hiveTable, Utils.classForName[Deserializer](tableDesc.getSerdeClassName), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index a2123c11df923..7bb290c8c2604 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -129,11 +129,7 @@ case class HiveTableScanExec( currentState.getConf.setClassLoader(originClassLoader) instance } else { - val originClassLoader = Thread.currentThread().getContextClassLoader - Thread.currentThread().setContextClassLoader(sparkSession.sharedState.jarClassLoader) - val instance = tableDesc.getDeserializerClass.getConstructor().newInstance() - Thread.currentThread().setContextClassLoader(originClassLoader) - instance + tableDesc.getDeserializerClass.getConstructor().newInstance() } deserializer.initialize(hiveConf, tableDesc.getProperties) @@ -196,6 +192,7 @@ case class HiveTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { + Thread.currentThread().setContextClassLoader(sparkSession.sharedState.jarClassLoader) // Using dummyCallSite, as getCallSite can turn out to be expensive with // multiple partitions. val rdd = if (!relation.isPartitioned) { From a02d0b61fc6a47a25959f73c63f40f00c5ee588e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 15 Sep 2019 20:08:29 +0800 Subject: [PATCH 15/22] change back code --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index d8ba7b17b8522..3f9925e73705e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -91,13 +91,11 @@ class HadoopTableReader( override def conf: SQLConf = sparkSession.sessionState.conf - override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = { + override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, Utils.classForName[Deserializer](tableDesc.getSerdeClassName), filterOpt = None) - } - /** * Creates a Hadoop RDD to read data from the target table's data directory. Returns a transformed From 01122e3d16d36ea49c767059c9f48cfb122399e1 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 15 Sep 2019 20:19:19 +0800 Subject: [PATCH 16/22] remove empty line --- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 7bb290c8c2604..301b4dd5df840 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -132,8 +132,7 @@ case class HiveTableScanExec( tableDesc.getDeserializerClass.getConstructor().newInstance() } deserializer.initialize(hiveConf, tableDesc.getProperties) - - + // Specifies types and object inspectors of columns to be scanned. val structOI = ObjectInspectorUtils .getStandardObjectInspector( From 340e0b6a1537a0df04b4519136f9d8524c5202ee Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 15 Sep 2019 20:27:59 +0800 Subject: [PATCH 17/22] remove space --- .../org/apache/spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 301b4dd5df840..5958f7dff448f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -132,7 +132,7 @@ case class HiveTableScanExec( tableDesc.getDeserializerClass.getConstructor().newInstance() } deserializer.initialize(hiveConf, tableDesc.getProperties) - + // Specifies types and object inspectors of columns to be scanned. val structOI = ObjectInspectorUtils .getStandardObjectInspector( From 35c961f138de759defc755039ed06e92e012d2d0 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 16 Sep 2019 19:11:15 +0800 Subject: [PATCH 18/22] solve this when construct HiveClientImpl's SessionState. --- .../spark/sql/hive/client/HiveClientImpl.scala | 1 + .../sql/hive/execution/HiveTableScanExec.scala | 13 +------------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5b2eeb2cf34c0..f1403d0a33c54 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -190,6 +190,7 @@ private[hive] class HiveClientImpl( if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } + state.getConf.setClassLoader(initClassLoader) SessionState.start(state) state.out = new PrintStream(outputBuffer, true, UTF_8.name()) state.err = new PrintStream(outputBuffer, true, UTF_8.name()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5958f7dff448f..5b00e2ebafa43 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -121,16 +120,7 @@ case class HiveTableScanExec( HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) - val currentState = SessionState.get() - val deserializer = if (currentState != null) { - val originClassLoader = currentState.getConf.getClassLoader - currentState.getConf.setClassLoader(sparkSession.sharedState.jarClassLoader) - val instance = tableDesc.getDeserializerClass.getConstructor().newInstance() - currentState.getConf.setClassLoader(originClassLoader) - instance - } else { - tableDesc.getDeserializerClass.getConstructor().newInstance() - } + val deserializer = tableDesc.getDeserializerClass.getConstructor().newInstance() deserializer.initialize(hiveConf, tableDesc.getProperties) // Specifies types and object inspectors of columns to be scanned. @@ -191,7 +181,6 @@ case class HiveTableScanExec( } protected override def doExecute(): RDD[InternalRow] = { - Thread.currentThread().setContextClassLoader(sparkSession.sharedState.jarClassLoader) // Using dummyCallSite, as getCallSite can turn out to be expensive with // multiple partitions. val rdd = if (!relation.isPartitioned) { From f30d001d4b29257df4fbec9415a642916823f020 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 17 Sep 2019 17:26:27 +0800 Subject: [PATCH 19/22] fix and add comment --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index f1403d0a33c54..260e45640218f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -161,7 +161,7 @@ private[hive] class HiveClientImpl( // HiveConf is a Hadoop Configuration, which has a field of classLoader and // the initial value will be the current thread's context class loader // (i.e. initClassLoader at here). - // We call initialConf.setClassLoader(initClassLoader) at here to make + // We call hiveConf.setClassLoader(initClassLoader) at here to make // this action explicit. hiveConf.setClassLoader(initClassLoader) @@ -190,6 +190,12 @@ private[hive] class HiveClientImpl( if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } + // For hive version higher then 2.3, when we initial SessionState, it will set + // a new UDFClassLoader to hiveConf's classLoader, when we use ADDJarCommand to + // add jar, it will be added to clientLoader.classLoader, then the jar won't be find + // in hiveConf's ClassLoader, here we rest it with clientLoader.ClassLoader which contains + // jars passed by --jars loaded by main thread ClassLoader. Thus we can load all jars + // passed by --jars and AddJarCommand. state.getConf.setClassLoader(initClassLoader) SessionState.start(state) state.out = new PrintStream(outputBuffer, true, UTF_8.name()) From 4db3b547a71d6dfaa4548cfd460ad860eb694522 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 18 Sep 2019 01:38:17 +0800 Subject: [PATCH 20/22] use clientLoader.classLoader --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 260e45640218f..045d61d59f3c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -196,7 +196,7 @@ private[hive] class HiveClientImpl( // in hiveConf's ClassLoader, here we rest it with clientLoader.ClassLoader which contains // jars passed by --jars loaded by main thread ClassLoader. Thus we can load all jars // passed by --jars and AddJarCommand. - state.getConf.setClassLoader(initClassLoader) + state.getConf.setClassLoader(clientLoader.classLoader) SessionState.start(state) state.out = new PrintStream(outputBuffer, true, UTF_8.name()) state.err = new PrintStream(outputBuffer, true, UTF_8.name()) From 8e1734674f23828b971b64f703eaea9ae17a6874 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 25 Sep 2019 21:17:55 +0800 Subject: [PATCH 21/22] fix hive version wrong in comment --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 045d61d59f3c2..0e9791c48e7ee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -190,7 +190,7 @@ private[hive] class HiveClientImpl( if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } - // For hive version higher then 2.3, when we initial SessionState, it will set + // For hive version >= 2.0, when we initial SessionState, it will set // a new UDFClassLoader to hiveConf's classLoader, when we use ADDJarCommand to // add jar, it will be added to clientLoader.classLoader, then the jar won't be find // in hiveConf's ClassLoader, here we rest it with clientLoader.ClassLoader which contains From f37335cda7602934271695c878584292cf6c9251 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 25 Sep 2019 23:32:45 +0800 Subject: [PATCH 22/22] change comment and add Hive Version condition --- .../spark/sql/hive/client/HiveClientImpl.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0e9791c48e7ee..19bd41b872da3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -190,13 +190,13 @@ private[hive] class HiveClientImpl( if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } - // For hive version >= 2.0, when we initial SessionState, it will set - // a new UDFClassLoader to hiveConf's classLoader, when we use ADDJarCommand to - // add jar, it will be added to clientLoader.classLoader, then the jar won't be find - // in hiveConf's ClassLoader, here we rest it with clientLoader.ClassLoader which contains - // jars passed by --jars loaded by main thread ClassLoader. Thus we can load all jars - // passed by --jars and AddJarCommand. - state.getConf.setClassLoader(clientLoader.classLoader) + // Hive 2.3 will set UDFClassLoader to hiveConf when initializing SessionState + // since HIVE-11878, and ADDJarCommand will add jars to clientLoader.classLoader. + // For this reason we cannot load the jars added by ADDJarCommand because of class loader + // got changed. We reset it to clientLoader.ClassLoader here. + if (HiveUtils.isHive23) { + state.getConf.setClassLoader(clientLoader.classLoader) + } SessionState.start(state) state.out = new PrintStream(outputBuffer, true, UTF_8.name()) state.err = new PrintStream(outputBuffer, true, UTF_8.name())