Skip to content

Commit a84dbcf

Browse files
author
hehuiyuan
committed
fix NPE problem when convert flink object for Map
1 parent 8b2fada commit a84dbcf

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,9 @@ public static Object toFlinkObject(ObjectInspector inspector, Object data, HiveS
332332
if (inspector instanceof MapObjectInspector) {
333333
MapObjectInspector mapInspector = (MapObjectInspector) inspector;
334334
Map<?, ?> map = mapInspector.getMap(data);
335+
if (map == null) {
336+
return new HashMap<>();
337+
}
335338

336339
Map<Object, Object> result = new HashMap<>(map.size());
337340
for (Map.Entry<?, ?> entry : map.entrySet()) {

flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.flink.types.Row;
3535
import org.apache.flink.util.CollectionUtil;
3636
import org.apache.flink.util.IOUtils;
37-
3837
import org.apache.hadoop.fs.FileSystem;
3938
import org.apache.hadoop.fs.Path;
4039
import org.apache.hadoop.hive.conf.HiveConf;
@@ -52,6 +51,7 @@
5251
import java.util.List;
5352
import java.util.Map;
5453

54+
import static org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER;
5555
import static org.junit.Assert.assertEquals;
5656
import static org.junit.Assert.assertFalse;
5757
import static org.junit.Assert.assertTrue;
@@ -531,6 +531,52 @@ public void testLocationWithComma() throws Exception {
531531
}
532532
}
533533

534+
@Test
535+
public void testReadHiveDataWithEmptyMapForHiveShim20X() throws Exception {
536+
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
537+
538+
try {
539+
// Flink to write parquet file
540+
String folderURI = TEMPORARY_FOLDER.newFolder().toURI().toString();
541+
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
542+
String format = "parquet";
543+
tableEnv.executeSql(
544+
String.format(
545+
"create table parquet_t (i int, j int) with ("
546+
+ "'connector'='filesystem','format'='%s','path'='%s')",
547+
format, folderURI));
548+
tableEnv.executeSql("insert into parquet_t select 1, 2").await();
549+
tableEnv.executeSql("drop table parquet_t");
550+
551+
// Hive to read parquet file and write to another table for empty map data
552+
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
553+
tableEnv.getConfig()
554+
.getConfiguration()
555+
.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true);
556+
tableEnv.executeSql(
557+
String.format(
558+
"create external table src_t (i int, j int) stored as %s location '%s'",
559+
format, folderURI));
560+
tableEnv.executeSql(
561+
String.format(
562+
"create table target_t (a int, b map<string, string>) stored as parquet"));
563+
564+
tableEnv.executeSql(
565+
"insert into target_t select i, map[cast(null as string), cast(null as string)] from src_t");
566+
567+
// Hive to read parquet table with empty map value
568+
List<Row> results =
569+
CollectionUtil.iteratorToList(
570+
tableEnv.sqlQuery("select * from target_t").execute().collect());
571+
} finally {
572+
tableEnv.getConfig()
573+
.getConfiguration()
574+
.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
575+
tableEnv.executeSql("drop table if exists target_t");
576+
tableEnv.executeSql("drop table if exists src_t");
577+
}
578+
}
579+
534580
private TableEnvironment getTableEnvWithHiveCatalog() {
535581
TableEnvironment tableEnv =
536582
HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);

0 commit comments

Comments
 (0)