From ce27fb32d2d988cde6fe64b0fd2bd7f9c8221859 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Wed, 27 Mar 2019 16:27:23 +0800 Subject: [PATCH 01/11] Merge remote-tracking branch 'upstream/master' into SPARK-23710-hadoop3 # Conflicts: # dev/deps/spark-deps-hadoop-3.2 # pom.xml # sql/core/pom.xml # sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala # sql/core/v1.2.1/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala # sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java # sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala # sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala --- dev/deps/spark-deps-hadoop-3.2 | 16 ++- pom.xml | 136 ++++++++++++++++++ .../datasources/orc/OrcColumnVector.java | 2 +- .../datasources/orc/OrcFilters.scala | 10 +- .../datasources/orc/OrcShimUtils.scala | 10 +- .../datasources/orc/OrcFilterSuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveShim.scala | 73 ++++++---- .../org/apache/spark/sql/hive/HiveUtils.scala | 8 +- .../org/apache/spark/sql/hive/hiveUDFs.scala | 19 ++- .../spark/sql/hive/orc/OrcFilters.scala | 90 ++++++++---- 10 files changed, 298 insertions(+), 68 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 6f3bbce3e746..0bd1f33ec72b 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -30,7 +30,7 @@ commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar commons-compiler-3.0.11.jar -commons-compress-1.8.1.jar +commons-compress-1.9.jar commons-configuration2-2.1.1.jar commons-crypto-1.0.0.jar commons-daemon-1.0.13.jar @@ -50,7 +50,7 @@ curator-client-2.13.0.jar curator-framework-2.13.0.jar curator-recipes-2.13.0.jar datanucleus-api-jdo-3.2.6.jar -datanucleus-core-3.2.10.jar +datanucleus-core-4.1.17.jar datanucleus-rdbms-3.2.9.jar derby-10.12.1.1.jar dnsjava-2.1.7.jar @@ -76,6 +76,16 @@ hadoop-yarn-common-3.2.0.jar hadoop-yarn-registry-3.2.0.jar hadoop-yarn-server-common-3.2.0.jar hadoop-yarn-server-web-proxy-3.2.0.jar +hive-common-2.3.4.jar +hive-llap-client-2.3.4.jar +hive-llap-common-2.3.4.jar +hive-serde-2.3.4.jar +hive-service-rpc-2.3.4.jar +hive-shims-0.23-2.3.4.jar +hive-shims-2.3.4.jar +hive-shims-common-2.3.4.jar +hive-shims-scheduler-2.3.4.jar +hive-storage-api-2.6.0.jar hk2-api-2.4.0-b34.jar hk2-locator-2.4.0-b34.jar hk2-utils-2.4.0-b34.jar @@ -124,6 +134,7 @@ jline-2.14.6.jar joda-time-2.9.3.jar jodd-core-3.5.2.jar jpam-1.1.jar +json-1.8.jar json-smart-2.3.jar json4s-ast_2.12-3.5.3.jar json4s-core_2.12-3.5.3.jar @@ -176,6 +187,7 @@ okhttp-3.8.1.jar okio-1.13.0.jar opencsv-2.3.jar orc-core-1.5.5-nohive.jar +orc-core-1.5.5.jar orc-mapreduce-1.5.5-nohive.jar orc-shims-1.5.5.jar oro-2.0.8.jar diff --git a/pom.xml b/pom.xml index 0cce66472be6..37aea0467283 100644 --- a/pom.xml +++ b/pom.xml @@ -1414,6 +1414,23 @@ commons-logging commons-logging + + + org.eclipse.jetty.aggregate + jetty-all + + + org.eclipse.jetty.orbit + javax.servlet + + + com.github.joshelser + dropwizard-metrics-hadoop-metrics2-reporter + + + org.apache.logging.log4j + * + @@ -1532,6 +1549,31 @@ org.json json + + + ${hive.group} + hive-vector-code-gen + + + ${hive.group} + hive-llap-tez + + + org.apache.orc + orc-tools + + + org.apache.calcite.avatica + avatica-metrics + + + org.apache.calcite.avatica + avatica + + + org.apache.logging.log4j + * + @@ -1697,6 +1739,35 @@ org.codehaus.groovy groovy-all + + + org.apache.hbase + hbase-client + + + HikariCP + com.zaxxer + + + co.cask.tephra + * + + + org.eclipse.jetty.aggregate + jetty-all + + + org.apache.parquet + parquet-hadoop-bundle + + + tomcat + jasper-compiler + + + tomcat + jasper-runtime + @@ -1762,6 +1833,11 @@ org.codehaus.groovy groovy-all + + + org.apache.logging.log4j + log4j-slf4j-impl + @@ -2656,7 +2732,67 @@ 3.2.0 2.13.0 3.4.13 + org.apache.hive + core + 2.3.4 + ${hive.version} + ${parquet.version} + + org.apache.parquet + 4.1.17 + + + ${hive.group} + hive-common + + + ${hive.group} + hive-serde + + + ${hive.group} + hive-shims + + + ${hive.group} + hive-llap-client + ${hive.version} + ${hive.deps.scope} + + + ${hive.group} + hive-common + + + ${hive.group} + hive-serde + + + org.apache.zookeeper + zookeeper + + + org.apache.curator + curator-framework + + + org.apache.curator + apache-curator + + + org.slf4j + slf4j-api + + + + + + org.apache.hive + hive-storage-api + 2.6.0 + + diff --git a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 9bfad1e83ee7..2f1925e69a33 100644 --- a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -19,7 +19,7 @@ import java.math.BigDecimal; -import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 112dcb2cb238..85d61bccc8e3 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.execution.datasources.orc -import org.apache.orc.storage.common.`type`.HiveDecimal -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder -import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder -import org.apache.orc.storage.serde2.io.HiveDecimalWritable +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala index 68503aba22b4..c32f024476e6 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.datasources.orc import java.sql.Date -import org.apache.orc.storage.common.`type`.HiveDecimal -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch -import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument} -import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator} -import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch +import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument} +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator} +import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable} import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.types.Decimal diff --git a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index e96c6fb7716c..1ed42f1c5f75 100644 --- a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} import org.apache.spark.sql.{AnalysisException, Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index c9fc3d4a02c4..c4ef8e7950df 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.{InputStream, OutputStream} +import java.lang.reflect.Method import java.rmi.server.UID import scala.collection.JavaConverters._ @@ -28,15 +29,13 @@ import com.google.common.base.Objects import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.ql.exec.{UDF, Utilities} +import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro import org.apache.hadoop.hive.serde2.ColumnProjectionUtils import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils} import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector import org.apache.hadoop.io.Writable -import org.apache.hive.com.esotericsoftware.kryo.Kryo -import org.apache.hive.com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.internal.Logging import org.apache.spark.sql.types.Decimal @@ -146,34 +145,60 @@ private[hive] object HiveShim { case _ => false } - @transient - def deserializeObjectByKryo[T: ClassTag]( - kryo: Kryo, - in: InputStream, - clazz: Class[_]): T = { - val inp = new Input(in) - val t: T = kryo.readObject(inp, clazz).asInstanceOf[T] - inp.close() - t - } + private lazy val serUtilClass = + Utils.classForName("org.apache.hadoop.hive.ql.exec.SerializationUtilities") + private lazy val utilClass = Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities") + private val deserializeMethodName = "deserializeObjectByKryo" + private val serializeMethodName = "serializeObjectByKryo" - @transient - def serializeObjectByKryo( - kryo: Kryo, - plan: Object, - out: OutputStream) { - val output: Output = new Output(out) - kryo.writeObject(output, plan) - output.close() + private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = klass.getDeclaredMethod(name, args: _*) + method.setAccessible(true) + method } def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = { - deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz) - .asInstanceOf[UDFType] + if (HiveUtils.isHive2) { + val borrowKryo = serUtilClass.getMethod("borrowKryo") + val kryo = borrowKryo.invoke(serUtilClass) + val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName, + kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]]) + try { + deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType] + } finally { + serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo) + } + } else { + val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo") + val threadLocalValue = runtimeSerializationKryo.get(utilClass) + val getMethod = threadLocalValue.getClass.getMethod("get") + val kryo = getMethod.invoke(threadLocalValue) + val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName, + kryo.getClass, classOf[InputStream], classOf[Class[_]]) + deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType] + } } def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = { - serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out) + if (HiveUtils.isHive2) { + val borrowKryo = serUtilClass.getMethod("borrowKryo") + val kryo = borrowKryo.invoke(serUtilClass) + val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName, + kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream]) + try { + serializeObjectByKryo.invoke(null, kryo, function, out) + } finally { + serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo) + } + } else { + val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo") + val threadLocalValue = runtimeSerializationKryo.get(utilClass) + val getMethod = threadLocalValue.getClass.getMethod("get") + val kryo = getMethod.invoke(threadLocalValue) + val serializeObjectByKryo = findMethod(utilClass, serializeMethodName, + kryo.getClass, classOf[Object], classOf[OutputStream]) + serializeObjectByKryo.invoke(null, kryo, function, out) + } } def writeExternal(out: java.io.ObjectOutput) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a7f40c6bff0b..d7f0371e440a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo +import org.apache.hive.common.util.HiveVersionInfo import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil @@ -54,8 +55,11 @@ private[spark] object HiveUtils extends Logging { sc } + private val hive1Version = "1.2.1" + private val hive2Version = "2.3.4" + val isHive2: Boolean = HiveVersionInfo.getVersion.equals(hive2Version) /** The version of hive used internally by Spark SQL. */ - val builtinHiveVersion: String = "1.2.1" + val builtinHiveVersion: String = if (isHive2) hive2Version else hive1Version val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + @@ -207,8 +211,6 @@ private[spark] object HiveUtils extends Logging { ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_READER_WAIT -> TimeUnit.MILLISECONDS, ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT -> TimeUnit.SECONDS, ConfVars.HIVE_LOG_INCREMENTAL_PLAN_PROGRESS_INTERVAL -> TimeUnit.MILLISECONDS, - ConfVars.HIVE_STATS_JDBC_TIMEOUT -> TimeUnit.SECONDS, - ConfVars.HIVE_STATS_RETRIES_WAIT -> TimeUnit.MILLISECONDS, ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES -> TimeUnit.SECONDS, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT -> TimeUnit.MILLISECONDS, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME -> TimeUnit.MILLISECONDS, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 8ece4b5caef6..7f8610ebb25f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import java.lang.{Boolean => JBoolean} import java.nio.ByteBuffer import scala.collection.JavaConverters._ @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.types._ - +import org.apache.spark.util.Utils private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) @@ -336,8 +337,20 @@ private[hive] case class HiveUDAFFunction( funcWrapper.createFunction[AbstractGenericUDAFResolver]() } - val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) - resolver.getEvaluator(parameterInfo) + val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName) + if (HiveUtils.isHive2) { + val ctor = clazz.getDeclaredConstructor( + classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE) + val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE) + val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo] + resolver.getEvaluator(parameterInfo) + } else { + val ctor = clazz.getDeclaredConstructor( + classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE) + val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE) + val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo] + resolver.getEvaluator(parameterInfo) + } } private case class HiveEvaluator( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index a82576a233ac..46fd31554222 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -17,14 +17,19 @@ package org.apache.spark.sql.hive.orc +import java.lang.reflect.Method + import org.apache.hadoop.hive.ql.io.sarg.SearchArgument import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => BuiltinOrcFilters} import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. @@ -57,22 +62,33 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[orc] object OrcFilters extends Logging { + + private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = klass.getMethod(name, args: _*) + method.setAccessible(true) + method + } + def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { - val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - - // First, tries to convert each filter individually to see whether it's convertible, and then - // collect all convertible ones to build the final `SearchArgument`. - val convertibleFilters = for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) - } yield filter - - for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters) - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) - } yield builder.build() + if (HiveUtils.isHive2) { + BuiltinOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] + } else { + val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + + // First, tries to convert each filter individually to see whether it's convertible, and then + // collect all convertible ones to build the final `SearchArgument`. + val convertibleFilters = for { + filter <- filters + _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) + } yield filter + + for { + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- buildTree(convertibleFilters) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + } yield builder.build() + } } private def buildSearchArgument( @@ -160,31 +176,57 @@ private[orc] object OrcFilters extends Logging { // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().equals(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) + .asInstanceOf[SearchArgument.Builder].end()) case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().nullSafeEquals(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) + .asInstanceOf[SearchArgument.Builder].end()) case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().lessThan(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) + .asInstanceOf[SearchArgument.Builder].end()) case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().lessThanEquals(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) + .asInstanceOf[SearchArgument.Builder].end()) case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startNot().lessThanEquals(attribute, value).end()) + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) + .asInstanceOf[SearchArgument.Builder].end()) case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startNot().lessThan(attribute, value).end()) + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) + .asInstanceOf[SearchArgument.Builder].end()) case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().isNull(attribute).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startNot().isNull(attribute).end()) + val bd = builder.startNot() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + Some(method.invoke(bd, attribute).asInstanceOf[SearchArgument.Builder].end()) case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().in(attribute, values.map(_.asInstanceOf[AnyRef]): _*).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "in", classOf[String], + Utils.classForName("[Ljava.lang.Object;")) + Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) + .asInstanceOf[SearchArgument.Builder].end()) case _ => None } From 0f2f07c717e36509cc037ee7ad29df1b4d15fd3a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 28 Mar 2019 22:24:50 +0800 Subject: [PATCH 02/11] Fix exclusion --- dev/deps/spark-deps-hadoop-3.2 | 6 ++ pom.xml | 62 +++---------------- .../spark/sql/hive/orc/OrcFilters.scala | 4 +- 3 files changed, 15 insertions(+), 57 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 0bd1f33ec72b..327a7ad78ba3 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -5,6 +5,7 @@ ST4-4.0.4.jar accessors-smart-1.2.jar activation-1.1.1.jar aircompressor-0.10.jar +ant-1.6.5.jar antlr-2.7.7.jar antlr-runtime-3.4.jar antlr4-runtime-4.7.1.jar @@ -35,6 +36,7 @@ commons-configuration2-2.1.1.jar commons-crypto-1.0.0.jar commons-daemon-1.0.13.jar commons-dbcp-1.4.jar +commons-el-1.0.jar commons-httpclient-3.1.jar commons-io-2.4.jar commons-lang-2.6.jar @@ -54,6 +56,7 @@ datanucleus-core-4.1.17.jar datanucleus-rdbms-3.2.9.jar derby-10.12.1.1.jar dnsjava-2.1.7.jar +dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar ehcache-3.3.1.jar flatbuffers-java-1.9.0.jar generex-1.0.1.jar @@ -109,10 +112,13 @@ jackson-module-scala_2.12-2.9.8.jar jakarta.activation-api-1.2.1.jar jakarta.xml.bind-api-2.3.2.jar janino-3.0.11.jar +jasper-compiler-5.5.23.jar +jasper-runtime-5.5.23.jar javassist-3.18.1-GA.jar javax.annotation-api-1.2.jar javax.inject-1.jar javax.inject-2.4.0-b34.jar +javax.servlet-3.0.0.v201112011016.jar javax.servlet-api-3.1.0.jar javax.ws.rs-api-2.0.1.jar javolution-5.5.1.jar diff --git a/pom.xml b/pom.xml index 37aea0467283..996cdb887e26 100644 --- a/pom.xml +++ b/pom.xml @@ -1419,14 +1419,6 @@ org.eclipse.jetty.aggregate jetty-all - - org.eclipse.jetty.orbit - javax.servlet - - - com.github.joshelser - dropwizard-metrics-hadoop-metrics2-reporter - org.apache.logging.log4j * @@ -1550,21 +1542,13 @@ json - - ${hive.group} - hive-vector-code-gen - ${hive.group} hive-llap-tez - org.apache.orc - orc-tools - - - org.apache.calcite.avatica - avatica-metrics + org.apache.calcite + calcite-druid org.apache.calcite.avatica @@ -1740,33 +1724,17 @@ groovy-all - - org.apache.hbase - hbase-client - - - HikariCP - com.zaxxer - - - co.cask.tephra - * - - - org.eclipse.jetty.aggregate - jetty-all - org.apache.parquet parquet-hadoop-bundle - tomcat - jasper-compiler + javax.servlet + servlet-api - tomcat - jasper-runtime + javax.servlet + jsp-api @@ -2760,18 +2728,6 @@ ${hive.version} ${hive.deps.scope} - - ${hive.group} - hive-common - - - ${hive.group} - hive-serde - - - org.apache.zookeeper - zookeeper - org.apache.curator curator-framework @@ -2780,13 +2736,9 @@ org.apache.curator apache-curator - - org.slf4j - slf4j-api - - + org.apache.hive hive-storage-api diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 46fd31554222..29f57d3ad1cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => BuiltinOrcFilters} +import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => DatasourceOrcFilters} import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.sources._ @@ -71,7 +71,7 @@ private[orc] object OrcFilters extends Logging { def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { if (HiveUtils.isHive2) { - BuiltinOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] + DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap From 5d584c89bac30626197d4309274da9f3480ad50d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 31 Mar 2019 00:34:26 +0800 Subject: [PATCH 03/11] Add exclusion reason to pom.xml --- dev/deps/spark-deps-hadoop-3.2 | 4 --- pom.xml | 28 +++++++++++++------ .../org/apache/spark/sql/hive/HiveShim.scala | 4 +-- .../org/apache/spark/sql/hive/HiveUtils.scala | 8 +++--- .../org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- .../spark/sql/hive/orc/OrcFilters.scala | 2 +- 6 files changed, 28 insertions(+), 20 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 327a7ad78ba3..b8a33d036d80 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -5,7 +5,6 @@ ST4-4.0.4.jar accessors-smart-1.2.jar activation-1.1.1.jar aircompressor-0.10.jar -ant-1.6.5.jar antlr-2.7.7.jar antlr-runtime-3.4.jar antlr4-runtime-4.7.1.jar @@ -36,7 +35,6 @@ commons-configuration2-2.1.1.jar commons-crypto-1.0.0.jar commons-daemon-1.0.13.jar commons-dbcp-1.4.jar -commons-el-1.0.jar commons-httpclient-3.1.jar commons-io-2.4.jar commons-lang-2.6.jar @@ -112,8 +110,6 @@ jackson-module-scala_2.12-2.9.8.jar jakarta.activation-api-1.2.1.jar jakarta.xml.bind-api-2.3.2.jar janino-3.0.11.jar -jasper-compiler-5.5.23.jar -jasper-runtime-5.5.23.jar javassist-3.18.1-GA.jar javax.annotation-api-1.2.jar javax.inject-1.jar diff --git a/pom.xml b/pom.xml index 996cdb887e26..58d89e12a77b 100644 --- a/pom.xml +++ b/pom.xml @@ -1414,15 +1414,18 @@ commons-logging commons-logging - + + org.eclipse.jetty.aggregate jetty-all + org.apache.logging.log4j * + @@ -1541,11 +1544,13 @@ org.json json - + + ${hive.group} hive-llap-tez + org.apache.calcite calcite-druid @@ -1554,10 +1559,12 @@ org.apache.calcite.avatica avatica + org.apache.logging.log4j * + @@ -1723,19 +1730,22 @@ org.codehaus.groovy groovy-all - + + org.apache.parquet parquet-hadoop-bundle + - javax.servlet - servlet-api + tomcat + jasper-compiler - javax.servlet - jsp-api + tomcat + jasper-runtime + @@ -1801,11 +1811,13 @@ org.codehaus.groovy groovy-all - + + org.apache.logging.log4j log4j-slf4j-impl + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index c4ef8e7950df..aa4f6beaa4ab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -158,7 +158,7 @@ private[hive] object HiveShim { } def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = { - if (HiveUtils.isHive2) { + if (HiveUtils.isSupportedHive2) { val borrowKryo = serUtilClass.getMethod("borrowKryo") val kryo = borrowKryo.invoke(serUtilClass) val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName, @@ -180,7 +180,7 @@ private[hive] object HiveShim { } def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = { - if (HiveUtils.isHive2) { + if (HiveUtils.isSupportedHive2) { val borrowKryo = serUtilClass.getMethod("borrowKryo") val kryo = borrowKryo.invoke(serUtilClass) val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index d7f0371e440a..d83d0e6be696 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -55,11 +55,11 @@ private[spark] object HiveUtils extends Logging { sc } - private val hive1Version = "1.2.1" - private val hive2Version = "2.3.4" - val isHive2: Boolean = HiveVersionInfo.getVersion.equals(hive2Version) + private val supportedHive2ShortVersions = Set("2.3.0") + val isSupportedHive2 = supportedHive2ShortVersions.contains(HiveVersionInfo.getShortVersion) + /** The version of hive used internally by Spark SQL. */ - val builtinHiveVersion: String = if (isHive2) hive2Version else hive1Version + val builtinHiveVersion: String = if (isSupportedHive2) HiveVersionInfo.getVersion else "1.2.1" val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 7f8610ebb25f..283b90519cae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -338,7 +338,7 @@ private[hive] case class HiveUDAFFunction( } val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName) - if (HiveUtils.isHive2) { + if (HiveUtils.isSupportedHive2) { val ctor = clazz.getDeclaredConstructor( classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE) val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 29f57d3ad1cc..638e4cb3040f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -70,7 +70,7 @@ private[orc] object OrcFilters extends Logging { } def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { - if (HiveUtils.isHive2) { + if (HiveUtils.isSupportedHive2) { DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap From 88d82403ba5c59ccb5bdea7a1c57eb93561eb9c4 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 31 Mar 2019 23:40:12 +0800 Subject: [PATCH 04/11] Move Hadoop-3's extra Hive dependency to sql/hive module --- dev/deps/spark-deps-hadoop-3.2 | 16 +------------ pom.xml | 43 +++++++++------------------------- sql/hive/pom.xml | 42 ++++++++++++++++++++++++--------- 3 files changed, 43 insertions(+), 58 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index b8a33d036d80..377c9cf5cd56 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -30,7 +30,7 @@ commons-cli-1.2.jar commons-codec-1.10.jar commons-collections-3.2.2.jar commons-compiler-3.0.11.jar -commons-compress-1.9.jar +commons-compress-1.8.1.jar commons-configuration2-2.1.1.jar commons-crypto-1.0.0.jar commons-daemon-1.0.13.jar @@ -54,7 +54,6 @@ datanucleus-core-4.1.17.jar datanucleus-rdbms-3.2.9.jar derby-10.12.1.1.jar dnsjava-2.1.7.jar -dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar ehcache-3.3.1.jar flatbuffers-java-1.9.0.jar generex-1.0.1.jar @@ -77,16 +76,6 @@ hadoop-yarn-common-3.2.0.jar hadoop-yarn-registry-3.2.0.jar hadoop-yarn-server-common-3.2.0.jar hadoop-yarn-server-web-proxy-3.2.0.jar -hive-common-2.3.4.jar -hive-llap-client-2.3.4.jar -hive-llap-common-2.3.4.jar -hive-serde-2.3.4.jar -hive-service-rpc-2.3.4.jar -hive-shims-0.23-2.3.4.jar -hive-shims-2.3.4.jar -hive-shims-common-2.3.4.jar -hive-shims-scheduler-2.3.4.jar -hive-storage-api-2.6.0.jar hk2-api-2.4.0-b34.jar hk2-locator-2.4.0-b34.jar hk2-utils-2.4.0-b34.jar @@ -114,7 +103,6 @@ javassist-3.18.1-GA.jar javax.annotation-api-1.2.jar javax.inject-1.jar javax.inject-2.4.0-b34.jar -javax.servlet-3.0.0.v201112011016.jar javax.servlet-api-3.1.0.jar javax.ws.rs-api-2.0.1.jar javolution-5.5.1.jar @@ -136,7 +124,6 @@ jline-2.14.6.jar joda-time-2.9.3.jar jodd-core-3.5.2.jar jpam-1.1.jar -json-1.8.jar json-smart-2.3.jar json4s-ast_2.12-3.5.3.jar json4s-core_2.12-3.5.3.jar @@ -189,7 +176,6 @@ okhttp-3.8.1.jar okio-1.13.0.jar opencsv-2.3.jar orc-core-1.5.5-nohive.jar -orc-core-1.5.5.jar orc-mapreduce-1.5.5-nohive.jar orc-shims-1.5.5.jar oro-2.0.8.jar diff --git a/pom.xml b/pom.xml index 58d89e12a77b..24b08cd79ead 100644 --- a/pom.xml +++ b/pom.xml @@ -130,6 +130,12 @@ 1.2.1.spark2 1.2.1 + + provided 2.2.0 10.12.1.1 @@ -1384,7 +1390,7 @@ ${hive.group} hive-common ${hive.version} - ${hive.deps.scope} + ${hive.extra.deps.scope} ${hive.group} @@ -1680,7 +1686,7 @@ ${hive.group} hive-serde ${hive.version} - ${hive.deps.scope} + ${hive.extra.deps.scope} ${hive.group} @@ -1769,7 +1775,7 @@ ${hive.group} hive-shims ${hive.version} - ${hive.deps.scope} + ${hive.extra.deps.scope} com.google.guava @@ -2716,41 +2722,14 @@ core 2.3.4 ${hive.version} + ${hive.deps.scope} ${parquet.version} org.apache.parquet 4.1.17 - - ${hive.group} - hive-common - - - ${hive.group} - hive-serde - - - ${hive.group} - hive-shims - - - ${hive.group} - hive-llap-client - ${hive.version} - ${hive.deps.scope} - - - org.apache.curator - curator-framework - - - org.apache.curator - apache-curator - - - - + org.apache.hive hive-storage-api diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 55afbe7c5d65..6e9daa9e071e 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -88,12 +88,10 @@ ${protobuf.version} --> - ${hive.group} hive-exec @@ -103,16 +101,38 @@ ${hive.group} hive-metastore - + + + org.apache.curator + curator-framework + + + org.apache.curator + apache-curator + + + org.apache.avro From 4c6c25fc1a9a78e6f9b19e4cecf8f1a4be3f8c74 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 1 Apr 2019 06:59:15 +0800 Subject: [PATCH 05/11] Fix dept issue --- dev/deps/spark-deps-hadoop-3.2 | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 377c9cf5cd56..03e1e3e4c95f 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -76,6 +76,7 @@ hadoop-yarn-common-3.2.0.jar hadoop-yarn-registry-3.2.0.jar hadoop-yarn-server-common-3.2.0.jar hadoop-yarn-server-web-proxy-3.2.0.jar +hive-storage-api-2.6.0.jar hk2-api-2.4.0-b34.jar hk2-locator-2.4.0-b34.jar hk2-utils-2.4.0-b34.jar From 78825a730db80161eb628f217bbf791c742ba573 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 2 Apr 2019 14:10:51 +0800 Subject: [PATCH 06/11] Fix dependency --- pom.xml | 42 +++++++++++++++++++++++--------- sql/hive/pom.xml | 63 ++++++++++++++++++++++++------------------------ 2 files changed, 63 insertions(+), 42 deletions(-) diff --git a/pom.xml b/pom.xml index 24b08cd79ead..665a375a09a1 100644 --- a/pom.xml +++ b/pom.xml @@ -130,12 +130,6 @@ 1.2.1.spark2 1.2.1 - - provided 2.2.0 10.12.1.1 @@ -1390,7 +1384,7 @@ ${hive.group} hive-common ${hive.version} - ${hive.extra.deps.scope} + ${hive.deps.scope} ${hive.group} @@ -1686,7 +1680,7 @@ ${hive.group} hive-serde ${hive.version} - ${hive.extra.deps.scope} + ${hive.deps.scope} ${hive.group} @@ -1775,7 +1769,7 @@ ${hive.group} hive-shims ${hive.version} - ${hive.extra.deps.scope} + ${hive.deps.scope} com.google.guava @@ -1826,6 +1820,33 @@ + + + + org.apache.hive + hive-llap-client + 2.3.4 + ${hive.deps.scope} + + + org.apache.hive + hive-common + + + org.apache.hive + hive-serde + + + org.apache.curator + curator-framework + + + org.apache.curator + apache-curator + + + + org.apache.orc orc-core @@ -2722,14 +2743,13 @@ core 2.3.4 ${hive.version} - ${hive.deps.scope} ${parquet.version} org.apache.parquet 4.1.17 - + org.apache.hive hive-storage-api diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 6e9daa9e071e..4d95ea1567a1 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -88,10 +88,12 @@ ${protobuf.version} --> + ${hive.group} hive-exec @@ -101,38 +103,16 @@ ${hive.group} hive-metastore - - ${hive.group} - hive-serde - - - ${hive.group} - hive-shims - - - org.apache.hive - hive-llap-client - 2.3.4 - ${hive.extra.deps.scope} - - - org.apache.hive - hive-common - - - org.apache.hive + org.apache.avro @@ -228,6 +208,27 @@ + + hadoop-3.2 + + + ${hive.group} + hive-common + + + ${hive.group} + hive-serde + + + ${hive.group} + hive-shims + + + org.apache.hive + hive-llap-client + + + From 78ceb007561016ac33b07ecd14676c0e6b0f1661 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 4 Apr 2019 14:35:09 +0800 Subject: [PATCH 07/11] Matching the Hive 2.3.x prefix --- .../src/main/scala/org/apache/spark/sql/hive/HiveShim.scala | 4 ++-- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 6 +++--- .../src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- .../scala/org/apache/spark/sql/hive/orc/OrcFilters.scala | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index aa4f6beaa4ab..be4a0c175b6d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -158,7 +158,7 @@ private[hive] object HiveShim { } def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = { - if (HiveUtils.isSupportedHive2) { + if (HiveUtils.isHive23) { val borrowKryo = serUtilClass.getMethod("borrowKryo") val kryo = borrowKryo.invoke(serUtilClass) val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName, @@ -180,7 +180,7 @@ private[hive] object HiveShim { } def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = { - if (HiveUtils.isSupportedHive2) { + if (HiveUtils.isHive23) { val borrowKryo = serUtilClass.getMethod("borrowKryo") val kryo = borrowKryo.invoke(serUtilClass) val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index f3d1d9df3561..773bf312031a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -55,11 +55,11 @@ private[spark] object HiveUtils extends Logging { sc } - private val supportedHive2ShortVersions = Set("2.3.0") - val isSupportedHive2 = supportedHive2ShortVersions.contains(HiveVersionInfo.getShortVersion) + private val hiveVersion = HiveVersionInfo.getVersion + val isHive23: Boolean = hiveVersion.startsWith("2.3") /** The version of hive used internally by Spark SQL. */ - val builtinHiveVersion: String = if (isSupportedHive2) HiveVersionInfo.getVersion else "1.2.1" + val builtinHiveVersion: String = if (isHive23) hiveVersion else "1.2.1" val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 283b90519cae..0938576a7147 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -338,7 +338,7 @@ private[hive] case class HiveUDAFFunction( } val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName) - if (HiveUtils.isSupportedHive2) { + if (HiveUtils.isHive23) { val ctor = clazz.getDeclaredConstructor( classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE) val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 638e4cb3040f..e48d3bea708f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -70,7 +70,7 @@ private[orc] object OrcFilters extends Logging { } def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { - if (HiveUtils.isSupportedHive2) { + if (HiveUtils.isHive23) { DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] } else { val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap From 2c571c7969b2d8d462f0d7d7dad40dfe514922f5 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 5 Apr 2019 00:34:21 +0800 Subject: [PATCH 08/11] Fix indent --- pom.xml | 2 +- .../spark/sql/hive/orc/OrcFilters.scala | 27 +++++++------------ 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index 4a13ff28a0fe..9461f9d66a1d 100644 --- a/pom.xml +++ b/pom.xml @@ -1548,7 +1548,7 @@ ${hive.group} - hive-llap-tez + hive-llap-tez diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index e48d3bea708f..e49f7a986e14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils /** * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. @@ -178,38 +177,32 @@ private[orc] object OrcFilters extends Logging { case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) - .asInstanceOf[SearchArgument.Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) - .asInstanceOf[SearchArgument.Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) - .asInstanceOf[SearchArgument.Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) - .asInstanceOf[SearchArgument.Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startNot() val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) - .asInstanceOf[SearchArgument.Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startNot() val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]) - .asInstanceOf[SearchArgument.Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() @@ -219,14 +212,12 @@ private[orc] object OrcFilters extends Logging { case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startNot() val method = findMethod(bd.getClass, "isNull", classOf[String]) - Some(method.invoke(bd, attribute).asInstanceOf[SearchArgument.Builder].end()) + Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() - val method = findMethod(bd.getClass, "in", classOf[String], - Utils.classForName("[Ljava.lang.Object;")) - Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) - .asInstanceOf[SearchArgument.Builder].end()) + val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) + Some(method.invoke(bd, attribute, values).asInstanceOf[Builder].end()) case _ => None } From d22e7e033c869adac93d4ff3ecbbfb451d92e491 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 5 Apr 2019 00:52:47 +0800 Subject: [PATCH 09/11] Object -> AnyRef --- .../apache/spark/sql/hive/orc/OrcFilters.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index e49f7a986e14..dfac73ce62fa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -177,32 +177,32 @@ private[orc] object OrcFilters extends Logging { case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startNot() val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startNot() val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) - Some(method.invoke(bd, attribute, value.asInstanceOf[Object]).asInstanceOf[Builder].end()) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() @@ -217,7 +217,8 @@ private[orc] object OrcFilters extends Logging { case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => val bd = builder.startAnd() val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) - Some(method.invoke(bd, attribute, values).asInstanceOf[Builder].end()) + Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) + .asInstanceOf[Builder].end()) case _ => None } From a3f7cff9ba3b2397f4d17ffa407463443b29aced Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 7 Apr 2019 10:06:23 +0800 Subject: [PATCH 10/11] Exclude org.eclipse.jetty.orbit:javax.servlet, org.apache.hbase:hbase-client and co.cask.tephra:* --- pom.xml | 84 +++++++++++++++---- sql/hive/pom.xml | 4 + .../sql/hive/client/HiveClientImpl.scala | 2 + 3 files changed, 76 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 9461f9d66a1d..ed49a3d33ec0 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ 1.2.1.spark2 + 2.3.4 1.2.1 @@ -1414,7 +1415,7 @@ commons-logging commons-logging - + org.eclipse.jetty.aggregate @@ -1425,7 +1426,17 @@ org.apache.logging.log4j * - + + + org.eclipse.jetty.orbit + javax.servlet + + + + org.apache.hive + hive-storage-api + + @@ -1544,7 +1555,7 @@ org.json json - + ${hive.group} @@ -1564,7 +1575,7 @@ org.apache.logging.log4j * - + @@ -1673,6 +1684,17 @@ org.slf4j slf4j-log4j12 + + + + org.apache.hbase + hbase-client + + + co.cask.tephra + * + + @@ -1730,7 +1752,7 @@ org.codehaus.groovy groovy-all - + org.apache.parquet @@ -1745,7 +1767,7 @@ tomcat jasper-runtime - + @@ -1811,21 +1833,22 @@ org.codehaus.groovy groovy-all - + org.apache.logging.log4j log4j-slf4j-impl - + - + org.apache.hive - hive-llap-client - 2.3.4 + hive-llap-common + ${hive23.version} ${hive.deps.scope} @@ -1836,6 +1859,31 @@ org.apache.hive hive-serde + + org.slf4j + slf4j-api + + + + + + org.apache.hive + hive-llap-client + ${hive23.version} + test + + + org.apache.hive + hive-common + + + org.apache.hive + hive-serde + + + org.apache.hive + hive-llap-common + org.apache.curator curator-framework @@ -1844,6 +1892,14 @@ org.apache.curator apache-curator + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + @@ -2741,11 +2797,11 @@ 3.4.13 org.apache.hive core - 2.3.4 - ${hive.version} + ${hive23.version} + 2.3.4 ${parquet.version} - org.apache.parquet + 4.1.17 diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 4d95ea1567a1..f627227aa038 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -223,6 +223,10 @@ ${hive.group} hive-shims + + org.apache.hive + hive-llap-common + org.apache.hive hive-llap-client diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 8132dee286b3..640cca0b24c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -710,6 +710,8 @@ private[hive] class HiveClientImpl( /** * Execute the command using Hive and return the results as a sequence. Each element * in the sequence is one row. + * Since upgrading the built-in Hive to 2.3, hive-llap-client is needed when + * running MapReduce jobs with `runHive`. */ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { logDebug(s"Running hiveql '$cmd'") From 073c88347cf25e21dcf607fb85f636a37b99f7ba Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 8 Apr 2019 15:51:54 +0800 Subject: [PATCH 11/11] hive.parquet.version -> 1.8.1 --- pom.xml | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index ed49a3d33ec0..da8e18ebce48 100644 --- a/pom.xml +++ b/pom.xml @@ -1416,6 +1416,15 @@ commons-logging + + + org.apache.orc + orc-core + org.eclipse.jetty.aggregate @@ -1843,7 +1852,7 @@ - org.apache.hive @@ -2799,8 +2808,8 @@ core ${hive23.version} 2.3.4 - ${parquet.version} org.apache.parquet + 1.8.1 4.1.17