Skip to content

Commit 62f56a7

Browse files
committed
[SPARK-17699] Support for parsing JSON string columns
1 parent 2f84a68 commit 62f56a7

File tree

19 files changed

+195
-22
lines changed

19 files changed

+195
-22
lines changed

python/pyspark/sql/functions.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1705,6 +1705,29 @@ def json_tuple(col, *fields):
17051705
jc = sc._jvm.functions.json_tuple(_to_java_column(col), _to_seq(sc, fields))
17061706
return Column(jc)
17071707

1708+
@since(2.1)
1709+
def from_json(col, schema, options={}):
1710+
"""
1711+
Parses a column containing a JSON string into a [[StructType]] with the
1712+
specified schema. Returns `null`, in the case of an unparseable string.
1713+
1714+
:param col: string column in json format
1715+
:param schema: a StructType to use when parsing the json column
1716+
:param options: options to control parsing. accepts the same options as the json datasource
1717+
1718+
>>> from pyspark.sql.types import *
1719+
>>> data = [(1, '''{"a": 1}''')]
1720+
>>> schema = StructType([StructField("a", IntegerType())])
1721+
>>> df = spark.createDataFrame(data, ("key", "value"))
1722+
>>> df.select(from_json(df.value, schema).alias("json")).collect()
1723+
[Row(json=Row(a=1))]
1724+
"""
1725+
1726+
sc = SparkContext._active_spark_context
1727+
jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
1728+
return Column(jc)
1729+
1730+
17081731

17091732
@since(1.5)
17101733
def size(col):

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import scala.util.parsing.combinator.RegexParsers
2323

2424
import com.fasterxml.jackson.core._
2525

26-
import org.apache.spark.sql.catalyst.InternalRow
2726
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2827
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
28+
import org.apache.spark.sql.catalyst.InternalRow
29+
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
30+
import org.apache.spark.sql.catalyst.util.ParseModes
2931
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
3032
import org.apache.spark.unsafe.types.UTF8String
3133
import org.apache.spark.util.Utils
@@ -467,3 +469,26 @@ case class JsonTuple(children: Seq[Expression])
467469
}
468470
}
469471

472+
/**
473+
* Converts an json input string to a [[StructType]] with the specified schema.
474+
*/
475+
case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
476+
extends Expression with CodegenFallback {
477+
override def nullable: Boolean = true
478+
479+
@transient
480+
lazy val parser =
481+
new JacksonParser(
482+
schema,
483+
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
484+
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
485+
486+
override def dataType: DataType = schema
487+
override def children: Seq[Expression] = child :: Nil
488+
489+
override def eval(input: InternalRow): Any = {
490+
try parser.parse(child.eval(input).toString).head catch {
491+
case _: SparkSQLJsonProcessingException => null
492+
}
493+
}
494+
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources.json
18+
package org.apache.spark.sql.catalyst.json
1919

2020
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2121
import org.apache.commons.lang3.time.FastDateFormat
2222

2323
import org.apache.spark.internal.Logging
24-
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
24+
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
2525

2626
/**
27-
* Options for the JSON data source.
27+
* Options for parsing JSON data into Spark SQL rows.
2828
*
2929
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
3030
*/
Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources.json
18+
package org.apache.spark.sql.catalyst.json
1919

2020
import java.io.ByteArrayOutputStream
2121

@@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
2828
import org.apache.spark.sql.catalyst.InternalRow
2929
import org.apache.spark.sql.catalyst.expressions._
3030
import org.apache.spark.sql.catalyst.util._
31-
import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
32-
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
3331
import org.apache.spark.sql.types._
3432
import org.apache.spark.unsafe.types.UTF8String
3533
import org.apache.spark.util.Utils
3634

37-
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
35+
private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
3836

37+
/**
38+
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
39+
*/
3940
class JacksonParser(
4041
schema: StructType,
4142
columnNameOfCorruptRecord: String,
4243
options: JSONOptions) extends Logging {
4344

45+
import JacksonUtils._
46+
import ParseModes._
4447
import com.fasterxml.jackson.core.JsonToken._
4548

4649
// A `ValueConverter` is responsible for converting a value from `JsonParser`
@@ -65,7 +68,7 @@ class JacksonParser(
6568
private def failedRecord(record: String): Seq[InternalRow] = {
6669
// create a row even if no corrupt record column is present
6770
if (options.failFast) {
68-
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
71+
throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
6972
}
7073
if (options.dropMalformed) {
7174
if (!isWarningPrintedForMalformedRecord) {
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources.json
18+
package org.apache.spark.sql.catalyst.json
1919

2020
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
2121

22-
private object JacksonUtils {
22+
object JacksonUtils {
2323
/**
2424
* Advance the parser until a null or a specific token is found
2525
*/
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources
18+
package org.apache.spark.sql.catalyst.util
1919

2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.io.SequenceFile.CompressionType
22-
import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
22+
import org.apache.hadoop.io.compress._
2323

2424
import org.apache.spark.util.Utils
2525

26-
private[datasources] object CompressionCodecs {
26+
object CompressionCodecs {
2727
private val shortCompressionCodecNames = Map(
2828
"none" -> null,
2929
"uncompressed" -> null,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala renamed to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources
18+
package org.apache.spark.sql.catalyst.util
1919

20-
private[datasources] object ParseModes {
20+
object ParseModes {
2121
val PERMISSIVE_MODE = "PERMISSIVE"
2222
val DROP_MALFORMED_MODE = "DROPMALFORMED"
2323
val FAIL_FAST_MODE = "FAILFAST"

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
1919

2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.util.ParseModes
23+
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
2224
import org.apache.spark.unsafe.types.UTF8String
2325

2426
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
317319
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
318320
InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
319321
}
322+
323+
test("from_json") {
324+
val jsonData = """{"a": 1}"""
325+
val schema = StructType(StructField("a", IntegerType) :: Nil)
326+
checkEvaluation(
327+
JsonToStruct(schema, Map.empty, Literal(jsonData)),
328+
InternalRow.fromSeq(1 :: Nil)
329+
)
330+
}
331+
332+
test("from_json - invalid data") {
333+
val jsonData = """{"a" 1}"""
334+
val schema = StructType(StructField("a", IntegerType) :: Nil)
335+
checkEvaluation(
336+
JsonToStruct(schema, Map.empty, Literal(jsonData)),
337+
null
338+
)
339+
340+
// Other modes should still return `null`.
341+
checkEvaluation(
342+
JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
343+
null
344+
)
345+
}
320346
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@ import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24-
import org.apache.spark.Partition
2524
import org.apache.spark.api.java.JavaRDD
2625
import org.apache.spark.internal.Logging
26+
import org.apache.spark.Partition
2727
import org.apache.spark.rdd.RDD
28+
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
2829
import org.apache.spark.sql.execution.LogicalRDD
2930
import org.apache.spark.sql.execution.datasources.DataSource
3031
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
31-
import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
32+
import org.apache.spark.sql.execution.datasources.json.InferSchema
3233
import org.apache.spark.sql.types.StructType
3334

3435
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce._
2828
import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.SparkSession
3030
import org.apache.spark.sql.catalyst.InternalRow
31+
import org.apache.spark.sql.catalyst.util.CompressionCodecs
3132
import org.apache.spark.sql.execution.datasources._
3233
import org.apache.spark.sql.sources._
3334
import org.apache.spark.sql.types._

0 commit comments

Comments
 (0)