Skip to content

Commit 4352a2f

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-2376][SQL] Selecting list values inside nested JSON objects raises java.lang.IllegalArgumentException
JIRA: https://issues.apache.org/jira/browse/SPARK-2376 Author: Yin Huai <[email protected]> Closes #1320 from yhuai/SPARK-2376 and squashes the following commits: 0107417 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-2376 480803d [Yin Huai] Correctly handling JSON arrays in PySpark.
1 parent f0496ee commit 4352a2f

File tree

2 files changed

+44
-25
lines changed

2 files changed

+44
-25
lines changed

python/pyspark/sql.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,12 @@ def jsonFile(self, path):
152152
>>> ofn.close()
153153
>>> srdd = sqlCtx.jsonFile(jsonFile)
154154
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
155-
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
156-
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
157-
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
158-
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
155+
>>> srdd2 = sqlCtx.sql(
156+
... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
157+
>>> srdd2.collect() == [
158+
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
159+
... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
160+
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
159161
True
160162
"""
161163
jschema_rdd = self._ssql_ctx.jsonFile(path)
@@ -167,10 +169,12 @@ def jsonRDD(self, rdd):
167169
168170
>>> srdd = sqlCtx.jsonRDD(json)
169171
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
170-
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2, field3 as f3 from table1")
171-
>>> srdd2.collect() == [{"f1": 1, "f2": "row1", "f3":{"field4":11}},
172-
... {"f1": 2, "f2": "row2", "f3":{"field4":22}},
173-
... {"f1": 3, "f2": "row3", "f3":{"field4":33}}]
172+
>>> srdd2 = sqlCtx.sql(
173+
... "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
174+
>>> srdd2.collect() == [
175+
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
176+
... {"f1":2, "f2":None, "f3":{"field4":22, "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
177+
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
174178
True
175179
"""
176180
def func(split, iterator):
@@ -492,8 +496,8 @@ def _test():
492496
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
493497
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
494498
jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
495-
'{"field1" : 2, "field2": "row2", "field3":{"field4":22}}',
496-
'{"field1" : 3, "field2": "row3", "field3":{"field4":33}}']
499+
'{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}',
500+
'{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}']
497501
globs['jsonStrings'] = jsonStrings
498502
globs['json'] = sc.parallelize(jsonStrings)
499503
globs['nestedRdd1'] = sc.parallelize([

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.util.{Map => JMap, List => JList, Set => JSet}
21+
22+
import scala.collection.JavaConversions._
23+
import scala.collection.JavaConverters._
24+
2025
import net.razorvine.pickle.Pickler
2126

2227
import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
@@ -27,10 +32,9 @@ import org.apache.spark.sql.catalyst.analysis._
2732
import org.apache.spark.sql.catalyst.expressions._
2833
import org.apache.spark.sql.catalyst.plans.logical._
2934
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
30-
import org.apache.spark.sql.catalyst.types.{DataType, StructType, BooleanType}
35+
import org.apache.spark.sql.catalyst.types.{ArrayType, BooleanType, StructType}
3136
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
3237
import org.apache.spark.api.java.JavaRDD
33-
import java.util.{Map => JMap}
3438

3539
/**
3640
* :: AlphaComponent ::
@@ -359,29 +363,40 @@ class SchemaRDD(
359363
case (obj, (name, dataType)) =>
360364
dataType match {
361365
case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
366+
case array @ ArrayType(struct: StructType) =>
367+
val arrayValues = obj match {
368+
case seq: Seq[Any] =>
369+
seq.map(element => rowToMap(element.asInstanceOf[Row], struct)).asJava
370+
case list: JList[Any] =>
371+
list.map(element => rowToMap(element.asInstanceOf[Row], struct))
372+
case set: JSet[Any] =>
373+
set.map(element => rowToMap(element.asInstanceOf[Row], struct))
374+
case array if array != null && array.getClass.isArray =>
375+
array.asInstanceOf[Array[Any]].map {
376+
element => rowToMap(element.asInstanceOf[Row], struct)
377+
}
378+
case other => other
379+
}
380+
map.put(name, arrayValues)
381+
case array: ArrayType => {
382+
val arrayValues = obj match {
383+
case seq: Seq[Any] => seq.asJava
384+
case other => other
385+
}
386+
map.put(name, arrayValues)
387+
}
362388
case other => map.put(name, obj)
363389
}
364390
}
365391

366392
map
367393
}
368394

369-
// TODO: Actually, the schema of a row should be represented by a StructType instead of
370-
// a Seq[Attribute]. Once we have finished that change, we can just use rowToMap to
371-
// construct the Map for python.
372-
val fields: Seq[(String, DataType)] = this.queryExecution.analyzed.output.map(
373-
field => (field.name, field.dataType))
395+
val rowSchema = StructType.fromAttributes(this.queryExecution.analyzed.output)
374396
this.mapPartitions { iter =>
375397
val pickle = new Pickler
376398
iter.map { row =>
377-
val map: JMap[String, Any] = new java.util.HashMap
378-
row.zip(fields).foreach { case (obj, (name, dataType)) =>
379-
dataType match {
380-
case struct: StructType => map.put(name, rowToMap(obj.asInstanceOf[Row], struct))
381-
case other => map.put(name, obj)
382-
}
383-
}
384-
map
399+
rowToMap(row, rowSchema)
385400
}.grouped(10).map(batched => pickle.dumps(batched.toArray))
386401
}
387402
}

0 commit comments

Comments
 (0)