Skip to content

Commit 6bb3435

Browse files
Merge branch 'v1.5.0_release'
ysq
2 parents 701e9fc + ad6caa3 commit 6bb3435

File tree

3 files changed

+3
-3
lines changed

3 files changed

+3
-3
lines changed

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public Row deserialize(byte[] message) throws IOException {
9898

9999
try {
100100
numInRecord.inc();
101-
numInBytes.inc(message.length);
101+
if(message!=null){numInBytes.inc(message.length);}
102102

103103
JsonNode root = objectMapper.readTree(message);
104104
Row row = new Row(fieldNames.length);

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public Row deserialize(byte[] message) throws IOException {
9898

9999
try {
100100
numInRecord.inc();
101-
numInBytes.inc(message.length);
101+
if(message!=null){numInBytes.inc(message.length);}
102102

103103
JsonNode root = objectMapper.readTree(message);
104104
Row row = new Row(fieldNames.length);

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/CustomerJsonDeserialization.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public Row deserialize(byte[] message) throws IOException {
100100

101101
try {
102102
numInRecord.inc();
103-
numInBytes.inc(message.length);
103+
if(message!=null){numInBytes.inc(message.length);}
104104

105105
JsonNode root = objectMapper.readTree(message);
106106
Row row = new Row(fieldNames.length);

0 commit comments

Comments
 (0)