Skip to content

Commit f636c14

Browse files
author
Nathan Howell
committed
Enable JsonRDD2 by default, add a flag to switch back to JsonRDD
1 parent 0bbc445 commit f636c14

File tree

4 files changed

+102
-42
lines changed

4 files changed

+102
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ private[spark] object SQLConf {
6767

6868
val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"
6969

70+
val USE_JSONRDD2 = "spark.sql.json.useJsonRDD2"
71+
7072
object Deprecated {
7173
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
7274
}
@@ -160,6 +162,8 @@ private[sql] class SQLConf extends Serializable {
160162

161163
private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean
162164

165+
private[spark] def useJsonRDD2: Boolean = getConf(USE_JSONRDD2, "true").toBoolean
166+
163167
/**
164168
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
165169
* a broadcast value during the physical executions of join operations. Setting this to -1

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -615,13 +615,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
615615
*/
616616
@Experimental
617617
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
618-
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
619-
val appliedSchema =
620-
Option(schema).getOrElse(
621-
JsonRDD.nullTypeToStringType(
622-
JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
623-
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
624-
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
618+
if (conf.useJsonRDD2) {
619+
baseRelationToDataFrame(new JSONRelation(json, None, 1.0, Some(schema))(this))
620+
} else {
621+
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
622+
val appliedSchema =
623+
Option(schema).getOrElse(
624+
JsonRDD.nullTypeToStringType(
625+
JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
626+
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
627+
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
628+
}
625629
}
626630

627631
/**
@@ -645,12 +649,16 @@ class SQLContext(@transient val sparkContext: SparkContext)
645649
*/
646650
@Experimental
647651
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
648-
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
649-
val appliedSchema =
650-
JsonRDD.nullTypeToStringType(
651-
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
652-
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
653-
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
652+
if (conf.useJsonRDD2) {
653+
baseRelationToDataFrame(new JSONRelation(json, None, samplingRatio, None)(this))
654+
} else {
655+
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
656+
val appliedSchema =
657+
JsonRDD.nullTypeToStringType(
658+
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
659+
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
660+
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
661+
}
654662
}
655663

656664
/**

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ import java.io.IOException
2222
import org.apache.hadoop.fs.Path
2323

2424
import org.apache.spark.rdd.RDD
25-
import org.apache.spark.sql.catalyst.expressions.Row
25+
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row}
2626
import org.apache.spark.sql.sources._
27-
import org.apache.spark.sql.types.StructType
27+
import org.apache.spark.sql.types.{StructField, StructType}
2828
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
2929

3030

3131
private[sql] class DefaultSource
32-
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
32+
extends RelationProvider
33+
with SchemaRelationProvider
34+
with CreatableRelationProvider {
3335

3436
private def checkPath(parameters: Map[String, String]): String = {
3537
parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
@@ -42,7 +44,7 @@ private[sql] class DefaultSource
4244
val path = checkPath(parameters)
4345
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
4446

45-
JSONRelation(path, samplingRatio, None)(sqlContext)
47+
new JSONRelation(path, samplingRatio, None, sqlContext)
4648
}
4749

4850
/** Returns a new base relation with the given schema and parameters. */
@@ -53,7 +55,7 @@ private[sql] class DefaultSource
5355
val path = checkPath(parameters)
5456
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
5557

56-
JSONRelation(path, samplingRatio, Some(schema))(sqlContext)
58+
new JSONRelation(path, samplingRatio, Some(schema), sqlContext)
5759
}
5860

5961
override def createRelation(
@@ -101,32 +103,69 @@ private[sql] class DefaultSource
101103
}
102104
}
103105

104-
private[sql] case class JSONRelation(
105-
path: String,
106-
samplingRatio: Double,
106+
private[sql] class JSONRelation(
107+
baseRDD: => RDD[String],
108+
val path: Option[String],
109+
val samplingRatio: Double,
107110
userSpecifiedSchema: Option[StructType])(
108111
@transient val sqlContext: SQLContext)
109112
extends BaseRelation
110113
with TableScan
111114
with InsertableRelation {
112115

113-
// TODO: Support partitioned JSON relation.
114-
private def baseRDD = sqlContext.sparkContext.textFile(path)
116+
def this(
117+
path: String,
118+
samplingRatio: Double,
119+
userSpecifiedSchema: Option[StructType],
120+
sqlContext: SQLContext) =
121+
this(
122+
sqlContext.sparkContext.textFile(path),
123+
Some(path),
124+
samplingRatio,
125+
userSpecifiedSchema)(sqlContext)
126+
127+
private val useJsonRDD2: Boolean = sqlContext.conf.useJsonRDD2
115128

116129
override val needConversion: Boolean = false
117130

118-
override val schema = userSpecifiedSchema.getOrElse(
119-
JsonRDD.nullTypeToStringType(
120-
JsonRDD.inferSchema(
131+
override lazy val schema = userSpecifiedSchema.getOrElse {
132+
if (useJsonRDD2) {
133+
JsonRDD2.nullTypeToStringType(
134+
JsonRDD2.inferSchema(
135+
baseRDD,
136+
samplingRatio,
137+
sqlContext.conf.columnNameOfCorruptRecord))
138+
} else {
139+
JsonRDD.nullTypeToStringType(
140+
JsonRDD.inferSchema(
141+
baseRDD,
142+
samplingRatio,
143+
sqlContext.conf.columnNameOfCorruptRecord))
144+
}
145+
}
146+
147+
override def buildScan(): RDD[Row] = {
148+
if (useJsonRDD2) {
149+
JsonRDD2.jsonStringToRow(
150+
baseRDD,
151+
schema,
152+
sqlContext.conf.columnNameOfCorruptRecord)
153+
} else {
154+
JsonRDD.jsonStringToRow(
121155
baseRDD,
122-
samplingRatio,
123-
sqlContext.conf.columnNameOfCorruptRecord)))
156+
schema,
157+
sqlContext.conf.columnNameOfCorruptRecord)
158+
}
159+
}
124160

125-
override def buildScan(): RDD[Row] =
126-
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
127161

128162
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
129-
val filesystemPath = new Path(path)
163+
val filesystemPath = path match {
164+
case Some(p) => new Path(p)
165+
case None =>
166+
throw new IOException(s"Cannot INSERT into table with no path defined")
167+
}
168+
130169
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
131170

132171
if (overwrite) {
@@ -147,7 +186,7 @@ private[sql] case class JSONRelation(
147186
}
148187
}
149188
// Write the data.
150-
data.toJSON.saveAsTextFile(path)
189+
data.toJSON.saveAsTextFile(filesystemPath.toString)
151190
// Right now, we assume that the schema is not changed. We will not update the schema.
152191
// schema = data.schema
153192
} else {

sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -580,19 +580,19 @@ class JsonSuite extends QueryTest {
580580
val analyzed = jsonDF.queryExecution.analyzed
581581
assert(
582582
analyzed.isInstanceOf[LogicalRelation],
583-
"The DataFrame returned by jsonFile should be based on JSONRelation.")
583+
"The DataFrame returned by jsonFile should be based on LogicalRelation.")
584584
val relation = analyzed.asInstanceOf[LogicalRelation].relation
585585
assert(
586586
relation.isInstanceOf[JSONRelation],
587587
"The DataFrame returned by jsonFile should be based on JSONRelation.")
588-
assert(relation.asInstanceOf[JSONRelation].path === path)
588+
assert(relation.asInstanceOf[JSONRelation].path === Some(path))
589589
assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001))
590590

591591
val schema = StructType(StructField("a", LongType, true) :: Nil)
592592
val logicalRelation =
593593
jsonFile(path, schema).queryExecution.analyzed.asInstanceOf[LogicalRelation]
594594
val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
595-
assert(relationWithSchema.path === path)
595+
assert(relationWithSchema.path === Some(path))
596596
assert(relationWithSchema.schema === schema)
597597
assert(relationWithSchema.samplingRatio > 0.99)
598598
}
@@ -1034,15 +1034,24 @@ class JsonSuite extends QueryTest {
10341034
}
10351035

10361036
test("JSONRelation equality test") {
1037-
val relation1 =
1038-
JSONRelation("path", 1.0, Some(StructType(StructField("a", IntegerType, true) :: Nil)))(null)
1037+
val context = org.apache.spark.sql.test.TestSQLContext
1038+
val relation1 = new JSONRelation(
1039+
"path",
1040+
1.0,
1041+
Some(StructType(StructField("a", IntegerType, true) :: Nil)),
1042+
context)
10391043
val logicalRelation1 = LogicalRelation(relation1)
1040-
val relation2 =
1041-
JSONRelation("path", 0.5, Some(StructType(StructField("a", IntegerType, true) :: Nil)))(
1042-
org.apache.spark.sql.test.TestSQLContext)
1044+
val relation2 = new JSONRelation(
1045+
"path",
1046+
0.5,
1047+
Some(StructType(StructField("a", IntegerType, true) :: Nil)),
1048+
context)
10431049
val logicalRelation2 = LogicalRelation(relation2)
1044-
val relation3 =
1045-
JSONRelation("path", 1.0, Some(StructType(StructField("b", StringType, true) :: Nil)))(null)
1050+
val relation3 = new JSONRelation(
1051+
"path",
1052+
1.0,
1053+
Some(StructType(StructField("b", StringType, true) :: Nil)),
1054+
context)
10461055
val logicalRelation3 = LogicalRelation(relation3)
10471056

10481057
assert(relation1 === relation2)

0 commit comments

Comments
 (0)