Skip to content

Conversation

@attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Nov 5, 2022

What changes were proposed in this pull request?

This is an update of #29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why HiveHBaseTableInputFormat does not work well with the NewHadoopRDD (see in the next section).

The PR modify TableReader.scala to create OldHadoopRDD when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

  • environments (Cloudera distribution 7.1.7.SP1):
    hadoop 3.1.1
    hive 3.1.300
    spark 3.2.1
    hbase 2.2.3

Why are the changes needed?

With the NewHadoopRDD the following exception is raised:

java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more

Short summary of the root cause

There are two interfaces:

  • the new org.apache.hadoop.mapreduce.InputFormat: providing a one arg method getSplits(JobContext context) (returning List<InputSplit>)
  • the old org.apache.hadoop.mapred.InputFormat: providing a two arg method getSplits(JobConf job, int numSplits) (returning InputSplit[])

And in Hive both are implemented by HiveHBaseTableInputFormat but only the old method leads to required initialisation and this why NewHadoopRDD fails here.

Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as master itself is a moving target).

Spark in NewHadoopRDD uses the new interface providing the one arg method:

val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See Hive#getSplits:

  @Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {

This calls getSplitsInternal which contains the initialisation too:

    initializeTable(conn, tableName);

Interesting that Hive also uses the one arg method internally within the getSplitsInternal here but the initialisation done earlier.

By calling the new interface method (what NewHadoopRDD does) the call goes straight to the HBase method: org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits.

Where there would be some JobContext based initialisation
which by default is an empty method:

  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {@link #createRecordReader(InputSplit, TaskAttemptContext)} and {@link #getSplits(JobContext)},
   * will call {@link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {@link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. create hbase table
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
  1. create hive table related to hbase table

hive>

CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '', 
  `value` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.hbase.HBaseSerDe' 
STORED BY 
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 
  'hbase.columns.mapping'=':key,cf1:v1', 
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')

 

3): spark-shell query hive table while data in HBase

scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+

@github-actions github-actions bot added the SQL label Nov 5, 2022
@attilapiros attilapiros changed the title Initial version [SPARK-32380][SQL] Fixing access of HBase table via Hive Nov 5, 2022
@attilapiros
Copy link
Contributor Author

cc @dongjoon-hyun, @HyukjinKwon

@attilapiros attilapiros changed the title [SPARK-32380][SQL] Fixing access of HBase table via Hive [SPARK-32380][SQL] Fixing access of HBase table via Hive from Spark Nov 5, 2022
@HyukjinKwon
Copy link
Member

Merged to master.

HyukjinKwon pushed a commit that referenced this pull request Nov 5, 2022
This is an update of #29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

No.

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 7009ef0)
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Nov 5, 2022
This is an update of #29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

No.

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes #38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 7009ef0)
Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon
Copy link
Member

Also merged to branch-3.3 and branch-3.2.

SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes were proposed in this pull request?

This is an update of apache#29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

### Why are the changes needed?

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

### Short summary of the root cause

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

### Detailed analyses

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes apache#38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
sunchao pushed a commit to sunchao/spark that referenced this pull request Jun 2, 2023
This is an update of apache#29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section).

The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'.

- environments (Cloudera distribution 7.1.7.SP1):
hadoop 3.1.1
hive 3.1.300
spark 3.2.1
hbase 2.2.3

With the `NewHadoopRDD` the following exception is raised:

```
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253)
  at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  ... 47 elided
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557)
  at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248)
  ... 86 more
```

There are two interfaces:

- the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`)
- the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`)

And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here.

Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target).

Spark in `NewHadoopRDD` uses the new interface providing the one arg method:
https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136

Hive on the other hand binds the initialisation to the two args method coming from the old interface.
See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268):

```
  Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException {
```

This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too:
```
    initializeTable(conn, tableName);
```

Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier.

By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230).

Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237)
which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640):
```java
  /**
   * Handle subclass specific set up. Each of the entry points used by the MapReduce framework,
   * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)},
   * will call {link #initialize(JobContext)} as a convenient centralized location to handle
   * retrieving the necessary configuration information and calling
   * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize
   * call such that it is safe to call multiple times. The current TableInputFormatBase
   * implementation relies on a non-null table reference to decide if an initialize call is needed,
   * but this behavior may change in the future. In particular, it is critical that initializeTable
   * not be called multiple times since this will leak Connection instances.
   */
  protected void initialize(JobContext context) throws IOException {
  }
```

This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark.

No.

1) create hbase table

```
 hbase(main):001:0>create 'hbase_test1', 'cf1'
 hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123'
```

2) create hive table related to hbase table

hive>
```
CREATE EXTERNAL TABLE `hivetest.hbase_test`(
  `key` string COMMENT '',
  `value` string COMMENT '')
ROW FORMAT SERDE
  'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
  'hbase.columns.mapping'=':key,cf1:v1',
  'serialization.format'='1')
TBLPROPERTIES (
  'hbase.table.name'='hbase_test')
```
 

3): spark-shell query hive table while data in HBase

```
scala> spark.sql("select * from hivetest.hbase_test").show()
22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f
+---+-----+
|key|value|
+---+-----+
| r1|  123|
+---+-----+
```

Closes apache#38516 from attilapiros/SPARK-32380.

Authored-by: attilapiros <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 7009ef0)
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants