Skip to content

Commit 26fea31

Browse files
author
Nathan Howell
committed
Recreate the baseRDD each for each scan operation
1 parent a7ebeb2 commit 26fea31

File tree

2 files changed

+15
-10
lines changed

2 files changed

+15
-10
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
616616
@Experimental
617617
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
618618
if (conf.useJacksonStreamingAPI) {
619-
baseRelationToDataFrame(new JSONRelation(json, None, 1.0, Some(schema))(this))
619+
baseRelationToDataFrame(new JSONRelation(() => json, None, 1.0, Some(schema))(this))
620620
} else {
621621
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
622622
val appliedSchema =
@@ -650,7 +650,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
650650
@Experimental
651651
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
652652
if (conf.useJacksonStreamingAPI) {
653-
baseRelationToDataFrame(new JSONRelation(json, None, samplingRatio, None)(this))
653+
baseRelationToDataFrame(new JSONRelation(() => json, None, samplingRatio, None)(this))
654654
} else {
655655
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
656656
val appliedSchema =

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,12 @@ private[sql] class DefaultSource
104104
}
105105

106106
private[sql] class JSONRelation(
107-
baseRDD: RDD[String],
107+
// baseRDD is not immutable with respect to INSERT OVERWRITE
108+
// and so it must be recreated at least as often as the
109+
// underlying inputs are modified. To be safe, a function is
110+
// used instead of a regular RDD value to ensure a fresh RDD is
111+
// recreated for each and every operation.
112+
baseRDD: () => RDD[String],
108113
val path: Option[String],
109114
val samplingRatio: Double,
110115
userSpecifiedSchema: Option[StructType])(
@@ -120,7 +125,7 @@ private[sql] class JSONRelation(
120125
userSpecifiedSchema: Option[StructType],
121126
sqlContext: SQLContext) =
122127
this(
123-
sqlContext.sparkContext.textFile(path),
128+
() => sqlContext.sparkContext.textFile(path),
124129
Some(path),
125130
samplingRatio,
126131
userSpecifiedSchema)(sqlContext)
@@ -132,13 +137,13 @@ private[sql] class JSONRelation(
132137
override lazy val schema = userSpecifiedSchema.getOrElse {
133138
if (useJacksonStreamingAPI) {
134139
InferSchema(
135-
baseRDD,
140+
baseRDD(),
136141
samplingRatio,
137142
sqlContext.conf.columnNameOfCorruptRecord)
138143
} else {
139144
JsonRDD.nullTypeToStringType(
140145
JsonRDD.inferSchema(
141-
baseRDD,
146+
baseRDD(),
142147
samplingRatio,
143148
sqlContext.conf.columnNameOfCorruptRecord))
144149
}
@@ -147,12 +152,12 @@ private[sql] class JSONRelation(
147152
override def buildScan(): RDD[Row] = {
148153
if (useJacksonStreamingAPI) {
149154
JacksonParser(
150-
baseRDD,
155+
baseRDD(),
151156
schema,
152157
sqlContext.conf.columnNameOfCorruptRecord)
153158
} else {
154159
JsonRDD.jsonStringToRow(
155-
baseRDD,
160+
baseRDD(),
156161
schema,
157162
sqlContext.conf.columnNameOfCorruptRecord)
158163
}
@@ -161,12 +166,12 @@ private[sql] class JSONRelation(
161166
override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
162167
if (useJacksonStreamingAPI) {
163168
JacksonParser(
164-
baseRDD,
169+
baseRDD(),
165170
StructType.fromAttributes(requiredColumns),
166171
sqlContext.conf.columnNameOfCorruptRecord)
167172
} else {
168173
JsonRDD.jsonStringToRow(
169-
baseRDD,
174+
baseRDD(),
170175
StructType.fromAttributes(requiredColumns),
171176
sqlContext.conf.columnNameOfCorruptRecord)
172177
}

0 commit comments

Comments
 (0)