diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2 index 326d0857a7c0..04526f2ab918 100644 --- a/dev/deps/spark-deps-hadoop-3.2 +++ b/dev/deps/spark-deps-hadoop-3.2 @@ -50,7 +50,7 @@ curator-client-2.13.0.jar curator-framework-2.13.0.jar curator-recipes-2.13.0.jar datanucleus-api-jdo-3.2.6.jar -datanucleus-core-3.2.10.jar +datanucleus-core-4.1.17.jar datanucleus-rdbms-3.2.9.jar derby-10.12.1.1.jar dnsjava-2.1.7.jar @@ -76,6 +76,7 @@ hadoop-yarn-common-3.2.0.jar hadoop-yarn-registry-3.2.0.jar hadoop-yarn-server-common-3.2.0.jar hadoop-yarn-server-web-proxy-3.2.0.jar +hive-storage-api-2.6.0.jar hk2-api-2.4.0-b34.jar hk2-locator-2.4.0-b34.jar hk2-utils-2.4.0-b34.jar diff --git a/pom.xml b/pom.xml index 62eb16db4181..da8e18ebce48 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ 1.2.1.spark2 + 2.3.4 1.2.1 @@ -1414,6 +1415,37 @@ commons-logging commons-logging + + + + org.apache.orc + orc-core + + + + org.eclipse.jetty.aggregate + jetty-all + + + + org.apache.logging.log4j + * + + + + org.eclipse.jetty.orbit + javax.servlet + + + + org.apache.hive + hive-storage-api + + @@ -1532,6 +1564,27 @@ org.json json + + + + ${hive.group} + hive-llap-tez + + + + org.apache.calcite + calcite-druid + + + org.apache.calcite.avatica + avatica + + + + org.apache.logging.log4j + * + + @@ -1640,6 +1693,17 @@ org.slf4j slf4j-log4j12 + + + + org.apache.hbase + hbase-client + + + co.cask.tephra + * + + @@ -1697,6 +1761,22 @@ org.codehaus.groovy groovy-all + + + + org.apache.parquet + parquet-hadoop-bundle + + + + tomcat + jasper-compiler + + + tomcat + jasper-runtime + + @@ -1762,8 +1842,76 @@ org.codehaus.groovy groovy-all + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + + + org.apache.hive + hive-llap-common + ${hive23.version} + ${hive.deps.scope} + + + org.apache.hive + hive-common + + + org.apache.hive + hive-serde + + + org.slf4j + slf4j-api + + + + org.apache.hive + hive-llap-client + ${hive23.version} + test + + + org.apache.hive + hive-common + + + org.apache.hive + hive-serde + + + org.apache.hive + hive-llap-common + + + org.apache.curator + curator-framework + + + org.apache.curator + apache-curator + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + + org.apache.orc orc-core @@ -2656,7 +2804,23 @@ 3.2.0 2.13.0 3.4.13 + org.apache.hive + core + ${hive23.version} + 2.3.4 + org.apache.parquet + 1.8.1 + + 4.1.17 + + + + org.apache.hive + hive-storage-api + 2.6.0 + + diff --git a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java index 9bfad1e83ee7..2f1925e69a33 100644 --- a/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java +++ b/sql/core/v2.3.4/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java @@ -19,7 +19,7 @@ import java.math.BigDecimal; -import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.hadoop.hive.ql.exec.vector.*; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 112dcb2cb238..85d61bccc8e3 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.execution.datasources.orc -import org.apache.orc.storage.common.`type`.HiveDecimal -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} -import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder -import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder -import org.apache.orc.storage.serde2.io.HiveDecimalWritable +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ diff --git a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala index 68503aba22b4..c32f024476e6 100644 --- a/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala +++ b/sql/core/v2.3.4/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcShimUtils.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.datasources.orc import java.sql.Date -import org.apache.orc.storage.common.`type`.HiveDecimal -import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch -import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument} -import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator} -import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable} +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch +import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument} +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator} +import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable} import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.types.Decimal diff --git a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index e96c6fb7716c..1ed42f1c5f75 100644 --- a/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/v2.3.4/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ -import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument} +import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument} import org.apache.spark.sql.{AnalysisException, Column, DataFrame} import org.apache.spark.sql.catalyst.dsl.expressions._ diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 55afbe7c5d65..f627227aa038 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -208,6 +208,31 @@ + + hadoop-3.2 + + + ${hive.group} + hive-common + + + ${hive.group} + hive-serde + + + ${hive.group} + hive-shims + + + org.apache.hive + hive-llap-common + + + org.apache.hive + hive-llap-client + + + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index c9fc3d4a02c4..be4a0c175b6d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.{InputStream, OutputStream} +import java.lang.reflect.Method import java.rmi.server.UID import scala.collection.JavaConverters._ @@ -28,15 +29,13 @@ import com.google.common.base.Objects import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.ql.exec.{UDF, Utilities} +import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro import org.apache.hadoop.hive.serde2.ColumnProjectionUtils import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils} import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector import org.apache.hadoop.io.Writable -import org.apache.hive.com.esotericsoftware.kryo.Kryo -import org.apache.hive.com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.internal.Logging import org.apache.spark.sql.types.Decimal @@ -146,34 +145,60 @@ private[hive] object HiveShim { case _ => false } - @transient - def deserializeObjectByKryo[T: ClassTag]( - kryo: Kryo, - in: InputStream, - clazz: Class[_]): T = { - val inp = new Input(in) - val t: T = kryo.readObject(inp, clazz).asInstanceOf[T] - inp.close() - t - } + private lazy val serUtilClass = + Utils.classForName("org.apache.hadoop.hive.ql.exec.SerializationUtilities") + private lazy val utilClass = Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities") + private val deserializeMethodName = "deserializeObjectByKryo" + private val serializeMethodName = "serializeObjectByKryo" - @transient - def serializeObjectByKryo( - kryo: Kryo, - plan: Object, - out: OutputStream) { - val output: Output = new Output(out) - kryo.writeObject(output, plan) - output.close() + private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = klass.getDeclaredMethod(name, args: _*) + method.setAccessible(true) + method } def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = { - deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz) - .asInstanceOf[UDFType] + if (HiveUtils.isHive23) { + val borrowKryo = serUtilClass.getMethod("borrowKryo") + val kryo = borrowKryo.invoke(serUtilClass) + val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName, + kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]]) + try { + deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType] + } finally { + serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo) + } + } else { + val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo") + val threadLocalValue = runtimeSerializationKryo.get(utilClass) + val getMethod = threadLocalValue.getClass.getMethod("get") + val kryo = getMethod.invoke(threadLocalValue) + val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName, + kryo.getClass, classOf[InputStream], classOf[Class[_]]) + deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType] + } } def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = { - serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out) + if (HiveUtils.isHive23) { + val borrowKryo = serUtilClass.getMethod("borrowKryo") + val kryo = borrowKryo.invoke(serUtilClass) + val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName, + kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream]) + try { + serializeObjectByKryo.invoke(null, kryo, function, out) + } finally { + serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo) + } + } else { + val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo") + val threadLocalValue = runtimeSerializationKryo.get(utilClass) + val getMethod = threadLocalValue.getClass.getMethod("get") + val kryo = getMethod.invoke(threadLocalValue) + val serializeObjectByKryo = findMethod(utilClass, serializeMethodName, + kryo.getClass, classOf[Object], classOf[OutputStream]) + serializeObjectByKryo.invoke(null, kryo, function, out) + } } def writeExternal(out: java.io.ObjectOutput) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 01a503db78dd..773bf312031a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.util.VersionInfo +import org.apache.hive.common.util.HiveVersionInfo import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil @@ -54,8 +55,11 @@ private[spark] object HiveUtils extends Logging { sc } + private val hiveVersion = HiveVersionInfo.getVersion + val isHive23: Boolean = hiveVersion.startsWith("2.3") + /** The version of hive used internally by Spark SQL. */ - val builtinHiveVersion: String = "1.2.1" + val builtinHiveVersion: String = if (isHive23) hiveVersion else "1.2.1" val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 8132dee286b3..640cca0b24c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -710,6 +710,8 @@ private[hive] class HiveClientImpl( /** * Execute the command using Hive and return the results as a sequence. Each element * in the sequence is one row. + * Since upgrading the built-in Hive to 2.3, hive-llap-client is needed when + * running MapReduce jobs with `runHive`. */ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { logDebug(s"Running hiveql '$cmd'") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 8ece4b5caef6..0938576a7147 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import java.lang.{Boolean => JBoolean} import java.nio.ByteBuffer import scala.collection.JavaConverters._ @@ -38,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.types._ - +import org.apache.spark.util.Utils private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) @@ -336,8 +337,20 @@ private[hive] case class HiveUDAFFunction( funcWrapper.createFunction[AbstractGenericUDAFResolver]() } - val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) - resolver.getEvaluator(parameterInfo) + val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName) + if (HiveUtils.isHive23) { + val ctor = clazz.getDeclaredConstructor( + classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE) + val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE) + val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo] + resolver.getEvaluator(parameterInfo) + } else { + val ctor = clazz.getDeclaredConstructor( + classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE) + val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE) + val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo] + resolver.getEvaluator(parameterInfo) + } } private case class HiveEvaluator( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index a82576a233ac..dfac73ce62fa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -17,12 +17,16 @@ package org.apache.spark.sql.hive.orc +import java.lang.reflect.Method + import org.apache.hadoop.hive.ql.io.sarg.SearchArgument import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.orc.{OrcFilters => DatasourceOrcFilters} import org.apache.spark.sql.execution.datasources.orc.OrcFilters.buildTree +import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -57,22 +61,33 @@ import org.apache.spark.sql.types._ * known to be convertible. */ private[orc] object OrcFilters extends Logging { + + private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = { + val method = klass.getMethod(name, args: _*) + method.setAccessible(true) + method + } + def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = { - val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap - - // First, tries to convert each filter individually to see whether it's convertible, and then - // collect all convertible ones to build the final `SearchArgument`. - val convertibleFilters = for { - filter <- filters - _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) - } yield filter - - for { - // Combines all convertible filters using `And` to produce a single conjunction - conjunction <- buildTree(convertibleFilters) - // Then tries to build a single ORC `SearchArgument` for the conjunction predicate - builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) - } yield builder.build() + if (HiveUtils.isHive23) { + DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]] + } else { + val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap + + // First, tries to convert each filter individually to see whether it's convertible, and then + // collect all convertible ones to build the final `SearchArgument`. + val convertibleFilters = for { + filter <- filters + _ <- buildSearchArgument(dataTypeMap, filter, newBuilder) + } yield filter + + for { + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- buildTree(convertibleFilters) + // Then tries to build a single ORC `SearchArgument` for the conjunction predicate + builder <- buildSearchArgument(dataTypeMap, conjunction, newBuilder) + } yield builder.build() + } } private def buildSearchArgument( @@ -160,31 +175,50 @@ private[orc] object OrcFilters extends Logging { // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().equals(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "equals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().nullSafeEquals(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "nullSafeEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().lessThan(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().lessThanEquals(attribute, value).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startNot().lessThanEquals(attribute, value).end()) + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThanEquals", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startNot().lessThan(attribute, value).end()) + val bd = builder.startNot() + val method = findMethod(bd.getClass, "lessThan", classOf[String], classOf[Object]) + Some(method.invoke(bd, attribute, value.asInstanceOf[AnyRef]).asInstanceOf[Builder].end()) case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().isNull(attribute).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startNot().isNull(attribute).end()) + val bd = builder.startNot() + val method = findMethod(bd.getClass, "isNull", classOf[String]) + Some(method.invoke(bd, attribute).asInstanceOf[Builder].end()) case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) => - Some(builder.startAnd().in(attribute, values.map(_.asInstanceOf[AnyRef]): _*).end()) + val bd = builder.startAnd() + val method = findMethod(bd.getClass, "in", classOf[String], classOf[Array[Object]]) + Some(method.invoke(bd, attribute, values.map(_.asInstanceOf[AnyRef])) + .asInstanceOf[Builder].end()) case _ => None }