Skip to content

Commit 45d798c

Browse files
committed
[SPARK-8278] Remove non-streaming JSON reader.
Author: Reynold Xin <[email protected]> Closes #7501 from rxin/jsonrdd and squashes the following commits: 767ec55 [Reynold Xin] More Mima 51f456e [Reynold Xin] Mima exclude. 789cb80 [Reynold Xin] Fixed compilation error. b4cf50d [Reynold Xin] [SPARK-8278] Remove non-streaming JSON reader.
1 parent 9914b1b commit 45d798c

File tree

6 files changed

+29
-518
lines changed

6 files changed

+29
-518
lines changed

project/MimaExcludes.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ object MimaExcludes {
6464
excludePackage("org.apache.spark.sql.execution"),
6565
// Parquet support is considered private.
6666
excludePackage("org.apache.spark.sql.parquet"),
67+
// The old JSON RDD is removed in favor of streaming Jackson
68+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
69+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
6770
// local function inside a method
6871
ProblemFilters.exclude[MissingMethodProblem](
6972
"org.apache.spark.sql.SQLContext.org$apache$spark$sql$SQLContext$$needsConversion$1")

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.api.java.JavaRDD
2727
import org.apache.spark.deploy.SparkHadoopUtil
2828
import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
30-
import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
30+
import org.apache.spark.sql.json.JSONRelation
3131
import org.apache.spark.sql.parquet.ParquetRelation2
3232
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
3333
import org.apache.spark.sql.types.StructType
@@ -236,17 +236,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
236236
*/
237237
def json(jsonRDD: RDD[String]): DataFrame = {
238238
val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
239-
if (sqlContext.conf.useJacksonStreamingAPI) {
240-
sqlContext.baseRelationToDataFrame(
241-
new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
242-
} else {
243-
val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
244-
val appliedSchema = userSpecifiedSchema.getOrElse(
245-
JsonRDD.nullTypeToStringType(
246-
JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
247-
val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord)
248-
sqlContext.internalCreateDataFrame(rowRDD, appliedSchema)
249-
}
239+
sqlContext.baseRelationToDataFrame(
240+
new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
250241
}
251242

252243
/**

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,6 @@ private[spark] object SQLConf {
401401
"spark.sql.useSerializer2",
402402
defaultValue = Some(true), isPublic = false)
403403

404-
val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI",
405-
defaultValue = Some(true), doc = "<TODO>")
406-
407404
object Deprecated {
408405
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
409406
}
@@ -473,8 +470,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
473470

474471
private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2)
475472

476-
private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API)
477-
478473
private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
479474

480475
private[spark] def defaultSizeInBytes: Long =

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -157,51 +157,27 @@ private[sql] class JSONRelation(
157157
}
158158
}
159159

160-
private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI
161-
162160
override val needConversion: Boolean = false
163161

164162
override lazy val schema = userSpecifiedSchema.getOrElse {
165-
if (useJacksonStreamingAPI) {
166-
InferSchema(
167-
baseRDD(),
168-
samplingRatio,
169-
sqlContext.conf.columnNameOfCorruptRecord)
170-
} else {
171-
JsonRDD.nullTypeToStringType(
172-
JsonRDD.inferSchema(
173-
baseRDD(),
174-
samplingRatio,
175-
sqlContext.conf.columnNameOfCorruptRecord))
176-
}
163+
InferSchema(
164+
baseRDD(),
165+
samplingRatio,
166+
sqlContext.conf.columnNameOfCorruptRecord)
177167
}
178168

179169
override def buildScan(): RDD[Row] = {
180-
if (useJacksonStreamingAPI) {
181-
JacksonParser(
182-
baseRDD(),
183-
schema,
184-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
185-
} else {
186-
JsonRDD.jsonStringToRow(
187-
baseRDD(),
188-
schema,
189-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
190-
}
170+
JacksonParser(
171+
baseRDD(),
172+
schema,
173+
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
191174
}
192175

193176
override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
194-
if (useJacksonStreamingAPI) {
195-
JacksonParser(
196-
baseRDD(),
197-
StructType.fromAttributes(requiredColumns),
198-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
199-
} else {
200-
JsonRDD.jsonStringToRow(
201-
baseRDD(),
202-
StructType.fromAttributes(requiredColumns),
203-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
204-
}
177+
JacksonParser(
178+
baseRDD(),
179+
StructType.fromAttributes(requiredColumns),
180+
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
205181
}
206182

207183
override def insert(data: DataFrame, overwrite: Boolean): Unit = {

0 commit comments

Comments
 (0)