Skip to content

Commit fe33121

Browse files
marmbrusyhuai
authored andcommitted
[SPARK-17699] Support for parsing JSON string columns
Spark SQL has great support for reading text files that contain JSON data. However, in many cases the JSON data is just one column amongst others. This is particularly true when reading from sources such as Kafka. This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema. Example usage: ```scala val df = Seq("""{"a": 1}""").toDS() val schema = new StructType().add("a", IntegerType) df.select(from_json($"value", schema) as 'json) // => [json: <a: int>] ``` This PR adds support for java, scala and python. I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it). I left SQL out for now, because I'm not sure how users would specify a schema. Author: Michael Armbrust <[email protected]> Closes #15274 from marmbrus/jsonParser.
1 parent 027dea8 commit fe33121

File tree

19 files changed

+198
-23
lines changed

19 files changed

+198
-23
lines changed

python/pyspark/sql/functions.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
17061706
return Column(jc)
17071707

17081708

1709+
@since(2.1)
1710+
def from_json(col, schema, options={}):
1711+
"""
1712+
Parses a column containing a JSON string into a [[StructType]] with the
1713+
specified schema. Returns `null`, in the case of an unparseable string.
1714+
1715+
:param col: string column in json format
1716+
:param schema: a StructType to use when parsing the json column
1717+
:param options: options to control parsing. accepts the same options as the json datasource
1718+
1719+
>>> from pyspark.sql.types import *
1720+
>>> data = [(1, '''{"a": 1}''')]
1721+
>>> schema = StructType([StructField("a", IntegerType())])
1722+
>>> df = spark.createDataFrame(data, ("key", "value"))
1723+
>>> df.select(from_json(df.value, schema).alias("json")).collect()
1724+
[Row(json=Row(a=1))]
1725+
"""
1726+
1727+
sc = SparkContext._active_spark_context
1728+
jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
1729+
return Column(jc)
1730+
1731+
17091732
@since(1.5)
17101733
def size(col):
17111734
"""

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
2323

2424
import com.fasterxml.jackson.core._
2525

26-
import org.apache.spark.sql.catalyst.InternalRow
2726
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2827
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
29-
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
28+
import org.apache.spark.sql.catalyst.InternalRow
29+
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
30+
import org.apache.spark.sql.catalyst.util.ParseModes
31+
import org.apache.spark.sql.types._
3032
import org.apache.spark.unsafe.types.UTF8String
3133
import org.apache.spark.util.Utils
3234

@@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
467469
}
468470
}
469471

472+
/**
473+
* Converts an json input string to a [[StructType]] with the specified schema.
474+
*/
475+
case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
476+
extends Expression with CodegenFallback with ExpectsInputTypes {
477+
override def nullable: Boolean = true
478+
479+
@transient
480+
lazy val parser =
481+
new JacksonParser(
482+
schema,
483+
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
484+
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
485+
486+
override def dataType: DataType = schema
487+
override def children: Seq[Expression] = child :: Nil
488+
489+
override def eval(input: InternalRow): Any = {
490+
try parser.parse(child.eval(input).toString).head catch {
491+
case _: SparkSQLJsonProcessingException => null
492+
}
493+
}
494+
495+
override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
496+
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources.json
18+
package org.apache.spark.sql.catalyst.json
1919

2020
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2121
import org.apache.commons.lang3.time.FastDateFormat
2222

2323
import org.apache.spark.internal.Logging
24-
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
24+
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
2525

2626
/**
27-
* Options for the JSON data source.
27+
* Options for parsing JSON data into Spark SQL rows.
2828
*
2929
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
3030
*/
Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources.json
18+
package org.apache.spark.sql.catalyst.json
1919

2020
import java.io.ByteArrayOutputStream
2121

@@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.catalyst.InternalRow
2929
import org.apache.spark.sql.catalyst.expressions._
3030
import org.apache.spark.sql.catalyst.util._
31-
import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
32-
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
3331
import org.apache.spark.sql.types._
3432
import org.apache.spark.unsafe.types.UTF8String
3533
import org.apache.spark.util.Utils
3634

37-
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
35+
private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
3836

37+
/**
38+
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
39+
*/
3940
class JacksonParser(
4041
schema: StructType,
4142
columnNameOfCorruptRecord: String,
4243
options: JSONOptions) extends Logging {
4344

45+
import JacksonUtils._
46+
import ParseModes._
4447
import com.fasterxml.jackson.core.JsonToken._
4548

4649
// A `ValueConverter` is responsible for converting a value from `JsonParser`
@@ -65,7 +68,7 @@ class JacksonParser(
6568
private def failedRecord(record: String): Seq[InternalRow] = {
6669
// create a row even if no corrupt record column is present
6770
if (options.failFast) {
68-
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
71+
throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
6972
}
7073
if (options.dropMalformed) {
7174
if (!isWarningPrintedForMalformedRecord) {
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources.json
18+
package org.apache.spark.sql.catalyst.json
1919

2020
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
2121

22-
private object JacksonUtils {
22+
object JacksonUtils {
2323
/**
2424
* Advance the parser until a null or a specific token is found
2525
*/
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources
18+
package org.apache.spark.sql.catalyst.util
1919

2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.io.SequenceFile.CompressionType
22-
import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
22+
import org.apache.hadoop.io.compress._
2323

2424
import org.apache.spark.util.Utils
2525

26-
private[datasources] object CompressionCodecs {
26+
object CompressionCodecs {
2727
private val shortCompressionCodecNames = Map(
2828
"none" -> null,
2929
"uncompressed" -> null,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala renamed to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources
18+
package org.apache.spark.sql.catalyst.util
1919

20-
private[datasources] object ParseModes {
20+
object ParseModes {
2121
val PERMISSIVE_MODE = "PERMISSIVE"
2222
val DROP_MALFORMED_MODE = "DROPMALFORMED"
2323
val FAIL_FAST_MODE = "FAILFAST"

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
1919

2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.util.ParseModes
23+
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
2224
import org.apache.spark.unsafe.types.UTF8String
2325

2426
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
317319
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
318320
InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
319321
}
322+
323+
test("from_json") {
324+
val jsonData = """{"a": 1}"""
325+
val schema = StructType(StructField("a", IntegerType) :: Nil)
326+
checkEvaluation(
327+
JsonToStruct(schema, Map.empty, Literal(jsonData)),
328+
InternalRow.fromSeq(1 :: Nil)
329+
)
330+
}
331+
332+
test("from_json - invalid data") {
333+
val jsonData = """{"a" 1}"""
334+
val schema = StructType(StructField("a", IntegerType) :: Nil)
335+
checkEvaluation(
336+
JsonToStruct(schema, Map.empty, Literal(jsonData)),
337+
null
338+
)
339+
340+
// Other modes should still return `null`.
341+
checkEvaluation(
342+
JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
343+
null
344+
)
345+
}
320346
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24-
import org.apache.spark.Partition
2524
import org.apache.spark.api.java.JavaRDD
2625
import org.apache.spark.internal.Logging
26+
import org.apache.spark.Partition
2727
import org.apache.spark.rdd.RDD
28+
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
2829
import org.apache.spark.sql.execution.LogicalRDD
2930
import org.apache.spark.sql.execution.datasources.DataSource
3031
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
31-
import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
32+
import org.apache.spark.sql.execution.datasources.json.InferSchema
3233
import org.apache.spark.sql.types.StructType
3334

3435
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
2929
import org.apache.spark.rdd.RDD
3030
import org.apache.spark.sql.SparkSession
3131
import org.apache.spark.sql.catalyst.InternalRow
32+
import org.apache.spark.sql.catalyst.util.CompressionCodecs
3233
import org.apache.spark.sql.execution.datasources._
3334
import org.apache.spark.sql.sources._
3435
import org.apache.spark.sql.types._

0 commit comments

Comments
 (0)