From 18e4ddc09e021b9f6115346740165fb7b5a8cb94 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 11 Feb 2015 16:36:50 +0800 Subject: [PATCH 1/6] [SQL] Reuse mutable row for each record at jsonStringToRow --- .../scala/org/apache/spark/sql/json/JsonRDD.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 3b8dde1823370..0c5fe0db1420f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -38,7 +38,19 @@ private[sql] object JsonRDD extends Logging { json: RDD[String], schema: StructType, columnNameOfCorruptRecords: String): RDD[Row] = { - parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, schema)) + // Reuse the mutable row for each record, however we still need to + // create a new row for every nested struct type in each record + val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType)) + parseJson(json, columnNameOfCorruptRecords).mapPartitions( iter => { + iter.map { parsed => + schema.fields.zipWithIndex.foreach { + case (StructField(name, dataType, _, _), i) => + mutableRow.update(i, parsed.get(name).flatMap(v => Option(v)).map( + enforceCorrectType(_, dataType)).orNull) + } + mutableRow: Row + } + }) } private[sql] def inferSchema( From 2f001ba8029e983f139b36be0cf69718da48c966 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 12 Feb 2015 19:52:45 +0800 Subject: [PATCH 2/6] Use mutable rows for inner structures --- .../org/apache/spark/sql/json/JsonRDD.scala | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 0c5fe0db1420f..35efcf3bb5c7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -38,19 +38,11 @@ private[sql] object JsonRDD extends Logging { json: RDD[String], schema: StructType, columnNameOfCorruptRecords: String): RDD[Row] = { - // Reuse the mutable row for each record, however we still need to - // create a new row for every nested struct type in each record - val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType)) - parseJson(json, columnNameOfCorruptRecords).mapPartitions( iter => { - iter.map { parsed => - schema.fields.zipWithIndex.foreach { - case (StructField(name, dataType, _, _), i) => - mutableRow.update(i, parsed.get(name).flatMap(v => Option(v)).map( - enforceCorrectType(_, dataType)).orNull) - } - mutableRow: Row - } - }) + // Reuse the mutable row for each record and all innner nested structures + parseJson(json, columnNameOfCorruptRecords).mapPartitions { + val row = new GenericMutableRow(schema.fields.length) + iter => iter.map(parsed => asRow(parsed, schema, row)) + } } private[sql] def inferSchema( @@ -413,7 +405,7 @@ private[sql] object JsonRDD extends Logging { } } - private[json] def enforceCorrectType(value: Any, desiredType: DataType): Any ={ + private[json] def enforceCorrectType(value: Any, desiredType: DataType, slot: Any = null): Any ={ if (value == null) { null } else { @@ -428,20 +420,29 @@ private[sql] object JsonRDD extends Logging { case NullType => null case ArrayType(elementType, _) => value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) - case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct) + case struct: StructType => + asRow(value.asInstanceOf[Map[String, Any]], struct, slot.asInstanceOf[GenericMutableRow]) case DateType => toDate(value) case TimestampType => toTimestamp(value) } } } - private def asRow(json: Map[String,Any], schema: StructType): Row = { - // TODO: Reuse the row instead of creating a new one for every record. - val row = new GenericMutableRow(schema.fields.length) - schema.fields.zipWithIndex.foreach { - case (StructField(name, dataType, _, _), i) => - row.update(i, json.get(name).flatMap(v => Option(v)).map( - enforceCorrectType(_, dataType)).orNull) + private def asRow( + json: Map[String,Any], + schema: StructType, + mutable: GenericMutableRow = null): Row = { + val row = if (mutable == null) { + new GenericMutableRow(schema.fields.length) + } else { + mutable + } + + for(i <- 0 until schema.fields.length) { + val fieldName = schema.fields(i).name + val fieldType = schema.fields(i).dataType + row.update(i, json.get(fieldName).flatMap(v => Option(v)).map( + enforceCorrectType(_, fieldType, row(i))).orNull) } row From 837785a20f47c5c5cc1929df79b6356fb3ac7b41 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 17:11:42 +0800 Subject: [PATCH 3/6] Use mutable row arrays for inner arrays --- .../scala/org/apache/spark/sql/json/JsonRDD.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 35efcf3bb5c7d..e4ccfeac53f28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -418,8 +418,15 @@ private[sql] object JsonRDD extends Logging { case DecimalType() => toDecimal(value) case BooleanType => value.asInstanceOf[BooleanType.JvmType] case NullType => null - case ArrayType(elementType, _) => - value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType)) + case ArrayType(elementType, _) => { + val arrayLength = value.asInstanceOf[Seq[Any]].length + val arraySlot = if (slot == null) { + (new Array[Any](arrayLength)).toSeq + } else { + slot.asInstanceOf[Seq[Any]] + } + value.asInstanceOf[Seq[Any]].zip(arraySlot).map{case (v, s) => enforceCorrectType(v, elementType,s)} + } case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct, slot.asInstanceOf[GenericMutableRow]) case DateType => toDate(value) From d97d7dbf1f917786d64382a1c79972d32e372930 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 17:21:13 +0800 Subject: [PATCH 4/6] keep scala style --- .../src/main/scala/org/apache/spark/sql/json/JsonRDD.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index e4ccfeac53f28..b94035fc6d1f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -425,7 +425,9 @@ private[sql] object JsonRDD extends Logging { } else { slot.asInstanceOf[Seq[Any]] } - value.asInstanceOf[Seq[Any]].zip(arraySlot).map{case (v, s) => enforceCorrectType(v, elementType,s)} + value.asInstanceOf[Seq[Any]].zip(arraySlot).map { + case (v, s) => enforceCorrectType(v, elementType,s) + } } case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct, slot.asInstanceOf[GenericMutableRow]) From 2d45c68f4c61408ec650fba3214a19446d96ce37 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 23:13:09 +0800 Subject: [PATCH 5/6] fix array reuse issue --- .../scala/org/apache/spark/sql/json/JsonRDD.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index b94035fc6d1f6..505c02db8b910 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -420,14 +420,14 @@ private[sql] object JsonRDD extends Logging { case NullType => null case ArrayType(elementType, _) => { val arrayLength = value.asInstanceOf[Seq[Any]].length - val arraySlot = if (slot == null) { - (new Array[Any](arrayLength)).toSeq - } else { + val arraySlot = if (slot != null && slot.asInstanceOf[Seq[Any]].size == arrayLength) { slot.asInstanceOf[Seq[Any]] + } else { + (new Array[Any](arrayLength)).toSeq } value.asInstanceOf[Seq[Any]].zip(arraySlot).map { case (v, s) => enforceCorrectType(v, elementType,s) - } + }.toList } case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct, slot.asInstanceOf[GenericMutableRow]) @@ -441,10 +441,11 @@ private[sql] object JsonRDD extends Logging { json: Map[String,Any], schema: StructType, mutable: GenericMutableRow = null): Row = { - val row = if (mutable == null) { - new GenericMutableRow(schema.fields.length) - } else { + + val row = if (mutable != null && mutable.length == schema.fields.length) { mutable + } else { + new GenericMutableRow(schema.fields.length) } for(i <- 0 until schema.fields.length) { From 2286ac550a16ac3c1c2c7dfaf9b8b20924bdf56a Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 23:19:02 +0800 Subject: [PATCH 6/6] keep scala style --- .../src/main/scala/org/apache/spark/sql/json/JsonRDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 505c02db8b910..4e3583a94474e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -405,7 +405,8 @@ private[sql] object JsonRDD extends Logging { } } - private[json] def enforceCorrectType(value: Any, desiredType: DataType, slot: Any = null): Any ={ + private[json] def enforceCorrectType( + value: Any, desiredType: DataType, slot: Any = null): Any = { if (value == null) { null } else {