diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 6c1ef6a6df88..e6165116072b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -17,20 +17,21 @@ package org.apache.spark.sql.execution.datasources -import org.apache.spark.{Logging, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions} +import org.apache.spark.sql.execution.datasources.json.JSONRelation import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.{Logging, TaskContext} /** * A Strategy for planning scans over data sources defined using the sources API. @@ -61,6 +62,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => + // One characteristics of JSONRelation is that, after updating data within the input folder, + // users don't need to refresh the relation manually to read the most recent data. This is + // a feature inherited from the old version of JSONRelation (the one before migrating to + // HadoopFsRelation). However, normal HadoopFsRelations don't share this characteristic. + // Here we specialize JSONRelation to do the refresh manually. + // + // Note that we can't do refreshing in JSONRelation.buildScan, because buildScan is invoked + // for each individual partition. + // + // Please refer to SPARK-10289 and SPARK-9743 for more details. + if (t.isInstanceOf[JSONRelation]) { + t.refresh() + } + val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray logInfo { @@ -88,6 +103,20 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => + // One characteristics of JSONRelation is that, after updating data within the input folder, + // users don't need to refresh the relation manually to read the most recent data. This is + // a feature inherited from the old version of JSONRelation (the one before migrating to + // HadoopFsRelation). However, normal HadoopFsRelations don't share this characteristic. + // Here we specialize JSONRelation to do the refresh manually. + // + // Note that we can't do refreshing in JSONRelation.buildScan, because buildScan is invoked + // for each individual partition. + // + // Please refer to SPARK-10289 and SPARK-9743 for more details. + if (t.isInstanceOf[JSONRelation]) { + t.refresh() + } + // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. val sharedHadoopConf = SparkHadoopUtil.get.conf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 114c8b211891..ab8ca5f748f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -111,15 +111,6 @@ private[sql] class JSONRelation( jsonSchema } - override private[sql] def buildScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputPaths: Array[String], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { - refresh() - super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf) - } - override def buildScan( requiredColumns: Array[String], filters: Array[Filter], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index b3b326fe612c..dff726b33fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -562,7 +562,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio }) } - private[sql] def buildScan( + final private[sql] def buildScan( requiredColumns: Array[String], filters: Array[Filter], inputPaths: Array[String],