Skip to content

Commit ac00522

Browse files
committed
Fixes JSONRelation refreshing on read path
1 parent bb16405 commit ac00522

File tree

3 files changed

+32
-12
lines changed

3 files changed

+32
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,21 @@
1717

1818
package org.apache.spark.sql.execution.datasources
1919

20-
import org.apache.spark.{Logging, TaskContext}
2120
import org.apache.spark.deploy.SparkHadoopUtil
2221
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
2322
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
24-
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
2523
import org.apache.spark.sql.catalyst.expressions._
2624
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2725
import org.apache.spark.sql.catalyst.plans.logical
2826
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
27+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions}
28+
import org.apache.spark.sql.execution.datasources.json.JSONRelation
2929
import org.apache.spark.sql.sources._
3030
import org.apache.spark.sql.types.{StringType, StructType}
3131
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _}
3232
import org.apache.spark.unsafe.types.UTF8String
3333
import org.apache.spark.util.{SerializableConfiguration, Utils}
34+
import org.apache.spark.{Logging, TaskContext}
3435

3536
/**
3637
* A Strategy for planning scans over data sources defined using the sources API.
@@ -61,6 +62,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
6162
// Scanning partitioned HadoopFsRelation
6263
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
6364
if t.partitionSpec.partitionColumns.nonEmpty =>
65+
// One characteristics of JSONRelation is that, after updating data within the input folder,
66+
// users don't need to refresh the relation manually to read the most recent data. This is
67+
// a feature inherited from the old version of JSONRelation (the one before migrating to
68+
// HadoopFsRelation). However, normal HadoopFsRelations don't share this characteristic.
69+
// Here we specialize JSONRelation to do the refresh manually.
70+
//
71+
// Note that we can't do refreshing in JSONRelation.buildScan, because buildScan is invoked
72+
// for each individual partition.
73+
//
74+
// Please refer to SPARK-10289 and SPARK-9743 for more details.
75+
if (t.isInstanceOf[JSONRelation]) {
76+
t.refresh()
77+
}
78+
6479
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
6580

6681
logInfo {
@@ -88,6 +103,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
88103

89104
// Scanning non-partitioned HadoopFsRelation
90105
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
106+
// One characteristics of JSONRelation is that, after updating data within the input folder,
107+
// users don't need to refresh the relation manually to read the most recent data. This is
108+
// a feature inherited from the old version of JSONRelation (the one before migrating to
109+
// HadoopFsRelation). However, normal HadoopFsRelations don't share this characteristic.
110+
// Here we specialize JSONRelation to do the refresh manually.
111+
//
112+
// Note that we can't do refreshing in JSONRelation.buildScan, because buildScan is invoked
113+
// for each individual partition.
114+
//
115+
// Please refer to SPARK-10289 and SPARK-9743 for more details.
116+
if (t.isInstanceOf[JSONRelation]) {
117+
t.refresh()
118+
}
119+
91120
// See buildPartitionedTableScan for the reason that we need to create a shard
92121
// broadcast HadoopConf.
93122
val sharedHadoopConf = SparkHadoopUtil.get.conf

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,6 @@ private[sql] class JSONRelation(
111111
jsonSchema
112112
}
113113

114-
override private[sql] def buildScan(
115-
requiredColumns: Array[String],
116-
filters: Array[Filter],
117-
inputPaths: Array[String],
118-
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
119-
refresh()
120-
super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf)
121-
}
122-
123114
override def buildScan(
124115
requiredColumns: Array[String],
125116
filters: Array[Filter],

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
562562
})
563563
}
564564

565-
private[sql] def buildScan(
565+
final private[sql] def buildScan(
566566
requiredColumns: Array[String],
567567
filters: Array[Filter],
568568
inputPaths: Array[String],

0 commit comments

Comments
 (0)