Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Scanning partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
t.refresh()
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray

logInfo {
Expand Down Expand Up @@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
t.refresh()
// See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ import java.io.CharArrayWriter
import com.fasterxml.jackson.core.JsonFactory
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{Text, LongWritable, NullWritable}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext, Job}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}

import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mapred.SparkHadoopMapRedUtil

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.util.SerializableConfiguration

private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
Expand Down Expand Up @@ -105,6 +107,15 @@ private[sql] class JSONRelation(
jsonSchema
}

override private[sql] def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we create rdds for partitioned json table, are we using this broadcastedConf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, 'cause we're not using SqlNewHadoopRDD here. This method was originally marked as final in HadoopFsRelation and wasn't supposed to be overriden in concrete data source implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, if we read a table with lots of partitioned, the time spent on planning the query will be pretty long, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's file a followup jira to re-use this broadcastedConf. So, we will not broadcast hadoop conf for every RDD created for every partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, do we have a jira for investigating using SerializableConfiguration instead of this shared broad conf for 1.6?

refresh()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that the same with the refresh invoking in DataSourceStrategy? As every time we build the rdd, we need to refresh the file status.

Actually, I've updated the #8023, by simply removing all of the refresh calls in the DataSourceStrategy, and the InsertIntoHadoopFsRelation will actually update the file status implicitly while writing the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refresh added in DataSourceStrategy also affects Parquet and ORC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't finished reviewing #8023, let's have an offline discussion about it tomorrow :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why calling refresh at here will work? Is inputPaths containing all file paths or dir paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputPaths contains the "base" input paths, while the inputPaths of the other buildScan method in JSONRelation contains FileStatuses of leaf files under base input paths, which is retrieved from the FileStatusCache in HadoopFsRelation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What issue? Cached metadata can be out-of-date when users add data directly? Because refreshing ORC and Parquet tables' metadata is expensive, I think that is a expected behavior (users need to call refresh explicitly).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but seems only ParquetRelation will refresh the meta data, for Json and ORC only will refresh the file status.

The problem here, once the table cached, even if people call the refresh() implicitly, we will not get the refreshed data either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For json, we do not have other metadata to cache other than file status, right?

once the table cached => you mean after we call cache table? I do see we drop the cached logical plan from our cache manager. Can you try it and see if we drop the cached table data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean we need to refresh the schema / metadata, but fetching the latest file under the table path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the files under the table path probably changed by other applications, I think we have to refresh the file status before the file scanning.

Probably we can move out the parquet meta data refreshing from the HadoopFsRelation.refresh.
Update: I mean from the ParquetRelation.refresh()

super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf)
}

override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
})
}

private[sql] final def buildScan(
private[sql] def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {

var path: File = null

override def beforeAll: Unit = {
override def beforeAll(): Unit = {
path = Utils.createTempDir()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
caseInsensitiveContext.read.json(rdd).registerTempTable("jt")
sql(
s"""
Expand All @@ -46,7 +46,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
""".stripMargin)
}

override def afterAll: Unit = {
override def afterAll(): Unit = {
caseInsensitiveContext.dropTempTable("jsonTable")
caseInsensitiveContext.dropTempTable("jt")
Utils.deleteRecursively(path)
Expand Down Expand Up @@ -110,7 +110,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
)

// Writing the table to less part files.
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5)
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5)
caseInsensitiveContext.read.json(rdd1).registerTempTable("jt1")
sql(
s"""
Expand All @@ -122,7 +122,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
)

// Writing the table to more part files.
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 10)
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10)
caseInsensitiveContext.read.json(rdd2).registerTempTable("jt2")
sql(
s"""
Expand Down