Skip to content

Conversation

@DeyinZhong
Copy link

What changes were proposed in this pull request?

The PR modify TableReader.scala to create OldHadoopRDD when inputformat is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat', beacuse default NewHadoopRDD can not access hbase table.
Reference link: https://issues.apache.org/jira/browse/SPARK-32380

  • environments:
    hadoop 2.8.5
    hive 2.3.7
    spark 3.0.0
    hbase 1.4.9

Why are the changes needed?

When sparksql cannot access hive table while data in hbase will encounter abnormality, want to fixed this bug.

Does this PR introduce any user-facing change?

no

How was this patch tested?

  • step1: create hbase table
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'

  • step2: create hive table related to hbase table

hive>

CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '', 
  `value` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.hbase.HBaseSerDe' 
STORED BY 
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 
  'hbase.columns.mapping'=':key,cf1:v1', 
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')

 

  • step3: sparksql query hive table while data in hbase

spark-sql --master yarn -e "select * from hivetest.hbase_test"

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making your first contribution, @DeyinZhong . So, this is only Apache Spark 3.0.0 issue?

IIRC, Apache Spark doesn't support Hive Storage Handler officially although it worked in some cases when you create from Hive side. We don't have a test coverage for this. So, I guess HBaseStorageHandler (this PR) and DruidStorageHandler and more might be in the same situation. In that case, we need a more general solution instead of one-by-one exception handling like !inputFormatClazz.getName.equalsIgnoreCase("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat").

if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
if (!inputFormatClazz.getName.
equalsIgnoreCase("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat")
&& classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can have a test case for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try Spark2.4.3 can work well.
spark3.0.0 will createNewHadoopRDD, and call 'val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala' in method getPartitions, this will call TableInputFormatBase.getSplits(JobContext context), but the variable table is null, so throw the execption;

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 21, 2020

Since this might be a regression due to SPARK-26630 , cc @gatorsmile and @HyukjinKwon , too.

@dongjoon-hyun
Copy link
Member

ok to test

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-32380] fixed spark3.0 access hive table while data in hbase problem [SPARK-32380][SQL] fixed spark3.0 access hive table while data in hbase problem Jul 21, 2020
@SparkQA
Copy link

SparkQA commented Jul 21, 2020

Test build #126261 has finished for PR 29178 at commit d170e24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val inputFormatClazz = localTableDesc.getInputFileFormatClass
if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
if (!inputFormatClazz.getName.
equalsIgnoreCase("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat implements new Hadoop inputformat interface but doesn't work?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the new MapReduce API (org.apache.hadoop.mapreduce) used when creating NewHadoopRDD , but The getsplits method of HiveHBaseTableInputFormat is implemented by org.apache.hadoop.mapred API,so some initialization operations (Table、connection) are not done,so the obtained variable table is null.And when using methord createOldHadoopRDD will use the org.apache.hadoop.mapred API,and some initialization operations (Table、connection) are doing,so it can work well.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@SparkQA
Copy link

SparkQA commented Jan 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38443/

@SparkQA
Copy link

SparkQA commented Jan 8, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38443/

@SparkQA
Copy link

SparkQA commented Jan 8, 2021

Test build #133854 has finished for PR 29178 at commit d170e24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

HyukjinKwon pushed a commit that referenced this pull request Nov 5, 2022
### What changes were proposed in this pull request?

This is an update of #29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

### Why are the changes needed?

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

### Short summary of the root cause

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

### Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Nov 5, 2022
This is an update of #29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

No.

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 7009ef0)
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Nov 5, 2022
This is an update of #29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

No.

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 7009ef0)
Signed-off-by: Hyukjin Kwon <[email protected]>
a0x8o added a commit to a0x8o/spark that referenced this pull request Nov 5, 2022
### What changes were proposed in this pull request?

This is an update of apache/spark#29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

### Why are the changes needed?

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

### Short summary of the root cause

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

### Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes were proposed in this pull request?

This is an update of apache#29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

### Why are the changes needed?

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

### Short summary of the root cause

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

### Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes apache#38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
a0x8o added a commit to a0x8o/spark that referenced this pull request Dec 30, 2022
### What changes were proposed in this pull request?

This is an update of apache/spark#29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

### Why are the changes needed?

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

### Short summary of the root cause

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

### Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
a0x8o added a commit to a0x8o/spark that referenced this pull request Dec 30, 2022
### What changes were proposed in this pull request?

This is an update of apache/spark#29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

### Why are the changes needed?

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

### Short summary of the root cause

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

### Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
This is an update of apache#29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

No.

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes apache#38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 7009ef0)
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants