From a3adc708e133e4eef61810f8d3c4b2a0bc863993 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Tue, 23 Apr 2019 15:34:24 +0800 Subject: [PATCH 1/6] Null values don't work in Kafka source v2 --- .../sql/kafka010/KafkaRecordToUnsafeRowConverter.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala index f35a143e00374..c425f2edc7af2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala @@ -36,7 +36,11 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { } else { rowWriter.write(0, record.key) } - rowWriter.write(1, record.value) + if (record.value() == null) { + rowWriter.setNullAt(1) + } else { + rowWriter.write(1, record.value) + } rowWriter.write(2, UTF8String.fromString(record.topic)) rowWriter.write(3, record.partition) rowWriter.write(4, record.offset) From 5cb950281e2cd32a16419c649e7a30257797692a Mon Sep 17 00:00:00 2001 From: uncleGen Date: Tue, 23 Apr 2019 15:37:07 +0800 Subject: [PATCH 2/6] minor update --- .../spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala index c425f2edc7af2..60e780acc2518 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala @@ -36,7 +36,7 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { } else { rowWriter.write(0, record.key) } - if (record.value() == null) { + if (record.value == null) { rowWriter.setNullAt(1) } else { rowWriter.write(1, record.value) From 74bfb87371416dedcd8262818e0d0529aa55c0b9 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 24 Apr 2019 12:33:07 +0800 Subject: [PATCH 3/6] add unit test --- .../KafkaRecordToUnsafeRowConverter.scala | 2 + .../kafka010/KafkaContinuousSourceSuite.scala | 48 +++++++++++++++++++ .../spark/sql/kafka010/KafkaTestUtils.scala | 11 +++++ .../expressions/codegen/UnsafeRowWriter.java | 4 ++ 4 files changed, 65 insertions(+) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala index 60e780acc2518..b109b477acb0e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala @@ -30,6 +30,8 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = { rowWriter.reset() + rowWriter.unsetNullAt(0) + rowWriter.unsetNullAt(1) if (record.key == null) { rowWriter.setNullAt(0) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index be0cea212f507..4160695985d05 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -169,6 +169,54 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } } } + + test("read kafka record containing null key/values") { + val table = "kafka_null_key_value_source_test" + withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withProducer { producer => + val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("startingOffsets", "earliest") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + + val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(ContinuousTrigger(100)) + .start() + try { + val expected1 = (1 to 5).map { _ => + producer.send(new ProducerRecord[String, String](topic, null, null)).get() + (null, null) + }.asInstanceOf[Seq[(String, String)]] + + val expected2 = (6 to 10).map { i => + producer.send(new ProducerRecord[String, String](topic, i.toString, null)).get() + (i.toString, null) + }.asInstanceOf[Seq[(String, String)]] + + val expected3 = (11 to 15).map { i => + producer.send(new ProducerRecord[String, String](topic, null, i.toString)).get() + (null, i.toString) + }.asInstanceOf[Seq[(String, String)]] + + eventually(timeout(streamingTimeout)) { + checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF()) + } + } finally { + q.stop() + } + } + } + } } class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index f2e4ee71450e6..404fd7bf8bc36 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -350,6 +350,17 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props } + /** Call `f` with a `KafkaProducer`. */ + def withProducer(f: KafkaProducer[String, String] => Unit): Unit = { + val props = producerConfiguration + val producer = new KafkaProducer[String, String](props) + try { + f(producer) + } finally { + producer.close() + } + } + /** Call `f` with a `KafkaProducer` that has initialized transactions. */ def withTranscationalProducer(f: KafkaProducer[String, String] => Unit): Unit = { val props = producerConfiguration diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java index 582882374e183..73af6965f0e1c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java @@ -111,6 +111,10 @@ public void setNullAt(int ordinal) { write(ordinal, 0L); } + public void unsetNullAt(int ordinal) { + BitSetMethods.unset(getBuffer(), startingOffset, ordinal); + } + @Override public void setNull1Bytes(int ordinal) { setNullAt(ordinal); From 45330de4af7a4895f76aeab2a6feb355fe15aeff Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 24 Apr 2019 14:37:47 +0800 Subject: [PATCH 4/6] fix comments --- .../KafkaRecordToUnsafeRowConverter.scala | 3 +- .../kafka010/KafkaContinuousSourceSuite.scala | 48 +---------------- .../kafka010/KafkaMicroBatchSourceSuite.scala | 52 +++++++++++++++++++ .../expressions/codegen/UnsafeRowWriter.java | 4 -- 4 files changed, 55 insertions(+), 52 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala index b109b477acb0e..306ef10b775a9 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala @@ -30,8 +30,7 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter { def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = { rowWriter.reset() - rowWriter.unsetNullAt(0) - rowWriter.unsetNullAt(1) + rowWriter.zeroOutNullBytes() if (record.key == null) { rowWriter.setNullAt(0) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 4160695985d05..a92b448631699 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -170,52 +170,8 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } } - test("read kafka record containing null key/values") { - val table = "kafka_null_key_value_source_test" - withTable(table) { - val topic = newTopic() - testUtils.createTopic(topic) - testUtils.withProducer { producer => - val df = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("startingOffsets", "earliest") - .option("subscribe", topic) - .load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - - val q = df - .writeStream - .format("memory") - .queryName(table) - .trigger(ContinuousTrigger(100)) - .start() - try { - val expected1 = (1 to 5).map { _ => - producer.send(new ProducerRecord[String, String](topic, null, null)).get() - (null, null) - }.asInstanceOf[Seq[(String, String)]] - - val expected2 = (6 to 10).map { i => - producer.send(new ProducerRecord[String, String](topic, i.toString, null)).get() - (i.toString, null) - }.asInstanceOf[Seq[(String, String)]] - - val expected3 = (11 to 15).map { i => - producer.send(new ProducerRecord[String, String](topic, null, i.toString)).get() - (null, i.toString) - }.asInstanceOf[Seq[(String, String)]] - - eventually(timeout(streamingTimeout)) { - checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF()) - } - } finally { - q.stop() - } - } - } + test(s"SPARK-27494: read kafka record containing null key/values in continuous mode") { + testNullableKeyValue(ContinuousTrigger(100)) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 21634ae2abfa1..21e57e345d5ea 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1040,6 +1040,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { q.stop() } } + + test(s"SPARK-27494: read kafka record containing null key/values in micro-batch mode") { + testNullableKeyValue(Trigger.ProcessingTime(100)) + } } @@ -1511,6 +1515,54 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) ) } + + protected def testNullableKeyValue(trigger: Trigger): Unit = { + val table = "kafka_null_key_value_source_test" + withTable(table) { + val topic = newTopic() + testUtils.createTopic(topic) + testUtils.withProducer { producer => + val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("startingOffsets", "earliest") + .option("subscribe", topic) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + + val q = df + .writeStream + .format("memory") + .queryName(table) + .trigger(trigger) + .start() + try { + val expected1 = (1 to 5).map { _ => + producer.send(new ProducerRecord[String, String](topic, null, null)).get() + (null, null) + }.asInstanceOf[Seq[(String, String)]] + + val expected2 = (6 to 10).map { i => + producer.send(new ProducerRecord[String, String](topic, i.toString, null)).get() + (i.toString, null) + }.asInstanceOf[Seq[(String, String)]] + + val expected3 = (11 to 15).map { i => + producer.send(new ProducerRecord[String, String](topic, null, i.toString)).get() + (null, i.toString) + }.asInstanceOf[Seq[(String, String)]] + + eventually(timeout(streamingTimeout)) { + checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF()) + } + } finally { + q.stop() + } + } + } + } } object KafkaSourceSuite { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java index 73af6965f0e1c..582882374e183 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java @@ -111,10 +111,6 @@ public void setNullAt(int ordinal) { write(ordinal, 0L); } - public void unsetNullAt(int ordinal) { - BitSetMethods.unset(getBuffer(), startingOffset, ordinal); - } - @Override public void setNull1Bytes(int ordinal) { setNullAt(ordinal); From d91977d22c975fe9e02d4b40e8b2193d06bb0643 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Thu, 25 Apr 2019 12:50:24 +0800 Subject: [PATCH 5/6] fix comments --- .../apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 2 +- .../apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index a92b448631699..c1ae0936363c5 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -170,7 +170,7 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } } - test(s"SPARK-27494: read kafka record containing null key/values in continuous mode") { + test("SPARK-27494: read kafka record containing null key/values in continuous mode") { testNullableKeyValue(ContinuousTrigger(100)) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 21e57e345d5ea..5ebe59d8dfaa8 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1041,7 +1041,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } - test(s"SPARK-27494: read kafka record containing null key/values in micro-batch mode") { + test("SPARK-27494: read kafka record containing null key/values in micro-batch mode") { testNullableKeyValue(Trigger.ProcessingTime(100)) } } From 174321a41cdd05d9aa5b39f0f2e22294119b1fd0 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Fri, 26 Apr 2019 10:05:06 +0800 Subject: [PATCH 6/6] fix comments --- .../kafka010/KafkaContinuousSourceSuite.scala | 2 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 24 ++++++++++++------- .../spark/sql/kafka010/KafkaTestUtils.scala | 11 --------- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index c1ae0936363c5..9b3e78c84c34a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -170,7 +170,7 @@ class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuo } } - test("SPARK-27494: read kafka record containing null key/values in continuous mode") { + test("SPARK-27494: read kafka record containing null key/values.") { testNullableKeyValue(ContinuousTrigger(100)) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5ebe59d8dfaa8..b98f8e97db2e6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1041,7 +1041,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } - test("SPARK-27494: read kafka record containing null key/values in micro-batch mode") { + test("SPARK-27494: read kafka record containing null key/values.") { testNullableKeyValue(Trigger.ProcessingTime(100)) } } @@ -1521,11 +1521,12 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { withTable(table) { val topic = newTopic() testUtils.createTopic(topic) - testUtils.withProducer { producer => + testUtils.withTranscationalProducer { producer => val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.isolation.level", "read_committed") .option("startingOffsets", "earliest") .option("subscribe", topic) .load() @@ -1539,21 +1540,26 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { .trigger(trigger) .start() try { - val expected1 = (1 to 5).map { _ => + var idx = 0 + producer.beginTransaction() + val expected1 = Seq.tabulate(5) { _ => producer.send(new ProducerRecord[String, String](topic, null, null)).get() (null, null) }.asInstanceOf[Seq[(String, String)]] - val expected2 = (6 to 10).map { i => - producer.send(new ProducerRecord[String, String](topic, i.toString, null)).get() - (i.toString, null) + val expected2 = Seq.tabulate(5) { _ => + idx += 1 + producer.send(new ProducerRecord[String, String](topic, idx.toString, null)).get() + (idx.toString, null) }.asInstanceOf[Seq[(String, String)]] - val expected3 = (11 to 15).map { i => - producer.send(new ProducerRecord[String, String](topic, null, i.toString)).get() - (null, i.toString) + val expected3 = Seq.tabulate(5) { _ => + idx += 1 + producer.send(new ProducerRecord[String, String](topic, null, idx.toString)).get() + (null, idx.toString) }.asInstanceOf[Seq[(String, String)]] + producer.commitTransaction() eventually(timeout(streamingTimeout)) { checkAnswer(spark.table(table), (expected1 ++ expected2 ++ expected3).toDF()) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 404fd7bf8bc36..f2e4ee71450e6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -350,17 +350,6 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props } - /** Call `f` with a `KafkaProducer`. */ - def withProducer(f: KafkaProducer[String, String] => Unit): Unit = { - val props = producerConfiguration - val producer = new KafkaProducer[String, String](props) - try { - f(producer) - } finally { - producer.close() - } - } - /** Call `f` with a `KafkaProducer` that has initialized transactions. */ def withTranscationalProducer(f: KafkaProducer[String, String] => Unit): Unit = { val props = producerConfiguration