diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 6a6ff6ca948c..c5e8a9f17640 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -299,6 +299,16 @@ class HadoopTableReader( } } + /** + * True if the new org.apache.hadoop.mapreduce.InputFormat is implemented (except + * HiveHBaseTableInputFormat where although the new interface is implemented by base HBase class + * the table inicialization in the Hive layer only happens via the old interface methods - + * for more details see SPARK-32380). + */ + private def compatibleWithNewHadoopRDD(inputClass: Class[_ <: oldInputClass[_, _]]): Boolean = + classOf[newInputClass[_, _]].isAssignableFrom(inputClass) && + !inputClass.getName.equalsIgnoreCase("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat") + /** * The entry of creating a RDD. * [SPARK-26630] Using which HadoopRDD will be decided by the input format of tables. @@ -307,7 +317,7 @@ class HadoopTableReader( */ private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = { val inputFormatClazz = localTableDesc.getInputFileFormatClass - if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { + if (compatibleWithNewHadoopRDD(inputFormatClazz)) { createNewHadoopRDD(localTableDesc, inputPathStr) } else { createOldHadoopRDD(localTableDesc, inputPathStr) @@ -316,7 +326,7 @@ class HadoopTableReader( private def createHadoopRDD(partitionDesc: PartitionDesc, inputPathStr: String): RDD[Writable] = { val inputFormatClazz = partitionDesc.getInputFileFormatClass - if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) { + if (compatibleWithNewHadoopRDD(inputFormatClazz)) { createNewHadoopRDD(partitionDesc, inputPathStr) } else { createOldHadoopRDD(partitionDesc, inputPathStr)