diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala index f35a143e00374..306ef10b775a9 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala @@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = { rowWriter.reset() + rowWriter.zeroOutNullBytes() if (record.key == null) { rowWriter.setNullAt(0) } else { rowWriter.write(0, record.key) } - rowWriter.write(1, record.value) + if (record.value == null) { + rowWriter.setNullAt(1) + } else { + rowWriter.write(1, record.value) + } rowWriter.write(2, UTF8String.fromString(record.topic)) rowWriter.write(3, record.partition) rowWriter.write(4, record.offset) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index be0cea212f507..9b3e78c84c34a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } } } + + test("SPARK-27494: read kafka record containing null key/values.") { + testNullableKeyValue(ContinuousTrigger(100)) + } } class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 21634ae2abfa1..b98f8e97db2e6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { q.stop() } } + + test("SPARK-27494: read kafka record containing null key/values.") { + testNullableKeyValue(Trigger.ProcessingTime(100)) + } } @@ -1511,6 +1515,60 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) ) } + + protected def testNullableKeyValue(trigger: Trigger): Unit = { + val table = "kafka_null_key_value_source_test" + withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withTranscationalProducer { producer => + val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") + .option("startingOffsets", "earliest") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + + val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(trigger) + .start() + try { + var idx = 0 + producer.beginTransaction() + val expected1 = Seq.tabulate(5) { _ => + producer.send(new ProducerRecord[String, String](topic, null, null)).get() + (null, null) + }.asInstanceOf[Seq[(String, String)]] + + val expected2 = Seq.tabulate(5) { _ => + idx += 1 + producer.send(new ProducerRecord[String, String](topic, idx.toString, null)).get() + (idx.toString, null) + }.asInstanceOf[Seq[(String, String)]] + + val expected3 = Seq.tabulate(5) { _ => + idx += 1 + producer.send(new ProducerRecord[String, String](topic, null, idx.toString)).get() + (null, idx.toString) + }.asInstanceOf[Seq[(String, String)]] + + producer.commitTransaction() + eventually(timeout(streamingTimeout)) { + checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF()) + } + } finally { + q.stop() + } + } + } + } } object KafkaSourceSuite {