From 2d760dc8f5e259172ecefe0a52733624654cd9cd Mon Sep 17 00:00:00 2001 From: attilapiros Date: Fri, 4 Nov 2022 14:42:10 -0700 Subject: [PATCH] Initial version --- .../org/apache/spark/sql/hive/TableReader.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 6a6ff6ca948cf..c5e8a9f17640c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -299,6 +299,16 @@ class HadoopTableReader( } } + /** + * True if the new org.apache.hadoop.mapreduce.InputFormat is implemented (except + * HiveHBaseTableInputFormat where although the new interface is implemented by base HBase class + * the table inicialization in the Hive layer only happens via the old interface methods - + * for more details see SPARK-32380). + */ + private def compatibleWithNewHadoopRDD(inputClass: Class[_ <: oldInputClass[_, _]]): Boolean = + classOf[newInputClass[_, _]].isAssignableFrom(inputClass) && + !inputClass.getName.equalsIgnoreCase("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat") + /** * The entry of creating a RDD. * [SPARK-26630] Using which HadoopRDD will be decided by the input format of tables. @@ -307,7 +317,7 @@ class HadoopTableReader( */ private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { val inputFormatClazz = localTableDesc.getInputFileFormatClass - if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { + if (compatibleWithNewHadoopRDD(inputFormatClazz)) { createNewHadoopRDD(localTableDesc, inputPathStr) } else { createOldHadoopRDD(localTableDesc, inputPathStr) @@ -316,7 +326,7 @@ class HadoopTableReader( private def createHadoopRDD(partitionDesc: PartitionDesc, inputPathStr: String): RDD[Writable] = { val inputFormatClazz = partitionDesc.getInputFileFormatClass - if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { + if (compatibleWithNewHadoopRDD(inputFormatClazz)) { createNewHadoopRDD(partitionDesc, inputPathStr) } else { createOldHadoopRDD(partitionDesc, inputPathStr)