From ff036d31c6adbc2cd5f2c9347c267073b673167b Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Fri, 25 Apr 2014 10:44:30 -0700 Subject: [PATCH 1/3] SPARK-1630: Make PythonRDD handle Null elements and strings gracefully --- .../apache/spark/api/python/PythonRDD.scala | 20 ++++++++++++++----- .../spark/api/python/PythonRDDSuite.scala | 5 +++++ 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 672c344a56597..3f5b9e66698bd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -245,7 +245,7 @@ private object SpecialLengths { val TIMING_DATA = -3 } -private[spark] object PythonRDD { +private[spark] object PythonRDD extends Logging { val UTF8 = Charset.forName("UTF-8") def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): @@ -301,15 +301,25 @@ private[spark] object PythonRDD { throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass) } case other => - throw new SparkException("Unexpected element type " + first.getClass) + Option(other) match { + case None => + logDebug("Encountered NULL element from iterator. We skip writing NULL to stream.") + case Some(x) => + throw new SparkException("Unexpected element type " + first.getClass) + } } } } def writeUTF(str: String, dataOut: DataOutputStream) { - val bytes = str.getBytes(UTF8) - dataOut.writeInt(bytes.length) - dataOut.write(bytes) + Option(str) match { + case None => + logDebug("Encountered NULL string. We skip writing NULL to stream.") + case Some(x) => + val bytes = x.getBytes(UTF8) + dataOut.writeInt(bytes.length) + dataOut.write(bytes) + } } def writeToFile[T](items: java.util.Iterator[T], filename: String) { diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index 7b866f08a0e9f..d47fcf0111037 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -29,5 +29,10 @@ class PythonRDDSuite extends FunSuite { PythonRDD.writeIteratorToStream(input.iterator, buffer) } + test("Handle nulls gracefully") { + val input: List[String] = List("a",null) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } } From 8a4a0f94d34b76b44b590ca741b438393b803106 Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Fri, 25 Apr 2014 11:24:41 -0700 Subject: [PATCH 2/3] SPARK-1630: Incorporated code-review feedback --- .../apache/spark/api/python/PythonRDD.scala | 18 ++++++++---------- .../spark/api/python/PythonRDDSuite.scala | 2 +- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 3f5b9e66698bd..29a0f7889659c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -301,22 +301,20 @@ private[spark] object PythonRDD extends Logging { throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass) } case other => - Option(other) match { - case None => - logDebug("Encountered NULL element from iterator. We skip writing NULL to stream.") - case Some(x) => - throw new SparkException("Unexpected element type " + first.getClass) + if (other == null) { + logDebug("Encountered NULL element from iterator. We skip writing NULL to stream.") + } else { + throw new SparkException("Unexpected element type " + first.getClass) } } } } def writeUTF(str: String, dataOut: DataOutputStream) { - Option(str) match { - case None => - logDebug("Encountered NULL string. We skip writing NULL to stream.") - case Some(x) => - val bytes = x.getBytes(UTF8) + if (str == null) { + logDebug("Encountered NULL string. We skip writing NULL to stream.") + } else { + val bytes = str.getBytes(UTF8) dataOut.writeInt(bytes.length) dataOut.write(bytes) } diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index d47fcf0111037..3304384432c70 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -30,7 +30,7 @@ class PythonRDDSuite extends FunSuite { } test("Handle nulls gracefully") { - val input: List[String] = List("a",null) + val input: List[String] = List("a", null) val buffer = new DataOutputStream(new ByteArrayOutputStream) PythonRDD.writeIteratorToStream(input.iterator, buffer) } From dddda9e2858d518c916b60972a2ba0a025b38855 Mon Sep 17 00:00:00 2001 From: Kalpit Shah Date: Fri, 25 Apr 2014 11:59:50 -0700 Subject: [PATCH 3/3] SPARK-1630: Fixed indentation --- .../spark/api/python/PythonRDDSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index 3304384432c70..c62f341441e53 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -23,16 +23,16 @@ import org.scalatest.FunSuite class PythonRDDSuite extends FunSuite { - test("Writing large strings to the worker") { - val input: List[String] = List("a"*100000) - val buffer = new DataOutputStream(new ByteArrayOutputStream) - PythonRDD.writeIteratorToStream(input.iterator, buffer) - } + test("Writing large strings to the worker") { + val input: List[String] = List("a"*100000) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } - test("Handle nulls gracefully") { - val input: List[String] = List("a", null) - val buffer = new DataOutputStream(new ByteArrayOutputStream) - PythonRDD.writeIteratorToStream(input.iterator, buffer) - } + test("Handle nulls gracefully") { + val input: List[String] = List("a", null) + val buffer = new DataOutputStream(new ByteArrayOutputStream) + PythonRDD.writeIteratorToStream(input.iterator, buffer) + } }