Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ private[sql] case class InMemoryRelation(
batchStats).asInstanceOf[this.type]
}

private[sql] def withChild(newChild: SparkPlan): this.type = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai @liancheng After double checking the source code, the spark plan of InMemoryRelation is the PhysicalRDD, which hold a data source scanning RDD instances as its property.

That's what I mean we will not take the latest files under the path when recache method called, because the RDD is materialized already and never been changed, this PR will re-created the SparkPlan from the logical plan, and the DataSourceStrategy will rebuild the RDD based on the latest files.

See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L99
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L312

I've actually tried some other approaches for the fixing:

  1. Update the code of PhyscialRDD, to take the RDDBuilder instead of the RDD for as its property, however this failed due to widely impact the existed code.
  2. Create a customized RDD, which take the path as parameter (instead of the file status), however, it's requires lots of interface changed in HadoopFsRelation, as inputFiles: Array[FileStatus] is widely used for buildScan, particularly the partition pruning is done in the DataSourceStrategy, not the HadoopFsRelation.

new InMemoryRelation(
output.map(_.newInstance()),
useCompression,
batchSize,
storageLevel,
newChild,
tableName)().asInstanceOf[this.type]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is equivalent to this.copy(child = newChild)() since InMemoryRelation is a case class.


def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers

override protected def otherCopyArgs: Seq[AnyRef] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

/** Holds a cached logical plan and its data */
private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
private[sql] class CachedData(
val plan: LogicalPlan,
var cachedRepresentation: InMemoryRelation) {
private[sql] def recache(sqlContext: SQLContext): Unit = {
cachedRepresentation.uncache(true) // release the cache
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of re-run the RDD, we re-create the RDD for the recache and run it, as once the RDD created, we don't have chance to change its input files any more, and the recache will actually doesn't work.

// re-generate the spark plan and cache
cachedRepresentation =
cachedRepresentation.withChild(sqlContext.executePlan(plan).executedPlan)
}
}

/**
* Provides support in a SQLContext for caching query results and automatically using these cached
Expand Down Expand Up @@ -97,13 +106,13 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
logWarning("Asked to cache already cached data.")
} else {
cachedData +=
CachedData(
new CachedData(
planToCache,
InMemoryRelation(
sqlContext.conf.useCompression,
sqlContext.conf.columnBatchSize,
storageLevel,
sqlContext.executePlan(query.logicalPlan).executedPlan,
sqlContext.executePlan(planToCache).executedPlan,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should change this line. For example, an existing Parquet dataset may be overwritten, and the new dataset has a completely different schema. When this does happen, the original code can catch the error by performing the analysis phase.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an optimization, and we don't want to re-analysis the logical plan as it's done right before the calls.

When you check the full code of the function cacheQuery, you will see the planToCache is just a local variable, not a logical plan cached by somewhere else, it should not be the case you were talking.

tableName))
}
}
Expand Down Expand Up @@ -156,10 +165,27 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
* function will over invalidate.
*/
private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock {
cachedData.foreach {
case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
data.cachedRepresentation.recache()
case _ =>
var i = 0
var locatedIdx = -1
// find the index of the cached data, according to the specified logical plan
while (i < cachedData.length && locatedIdx < 0) {
cachedData(i) match {
case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
locatedIdx = i
case _ =>
}
i += 1
}

if (locatedIdx >= 0) {
// if the cached data exists, remove it from the cache data list, as we need to
// re-generate the spark plan, and we don't want the this to be used during the
// re-generation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to remove it first, since the whole method is wrapped in writeLock.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In SQLContext.QueryExecution, we always try to load the cached PhysicalRDD according to the given analyzed logical plan first, if we don't remove it from the cached list, then we will always get the stale one (PhysicalRDD), this is not what we want, right?

val entry = cachedData.remove(locatedIdx) // TODO do we have to use ArrayBuffer?
// rebuild the cache
entry.recache(sqlContext)
// add it back to the cache data list
cachedData += entry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A problem of this change is that, plan can appear multiple times in cachedData. For example (PySpark snippet):

df0 = sqlContext.range(10)
df1 = df0.filter(df0.id > 5).cache()
df2 = df0.filter(df0.id > 1).cache()

df1.count()
df2.count()

In the above case, query plan of df0 appears twice in df1 and df2. Since this method isn't performance critical, we can correct and simplify the block within writeLock to:

    cachedData.foreach { data =>
      if (data.plan.find(_.sameResult(plan)).isDefined) {
        data.recache(sqlContext)
      }
    }

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's a good catch, I will see how to fix the chained logical cached plan.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And probably we can not simply use the data.plan.find, as the cached data most likely will add wrapper like LogicalPlan etc.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ private[sql] case class InsertIntoHadoopFsRelation(
logInfo("Skipping insertion into a relation that already exists.")
}

// Invalidate the cache.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai We need to refresh the relation to get the latest file status right here, however, we couldn't find the correct relation from catalog, probably we need to provide a API for catalog, which support get the relation by data path.

And even user refresh the file status explicitly, I don't think we have correct API for that, do we?

sqlContext.cacheManager.invalidateCache(LogicalRelation(relation))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be moved right after the relation.refresh() call above. Since we don't want to invalidate the cache when

  1. no insertion needs to be performed, or
  2. the write job fails

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, true, I will update it.


Seq.empty[Row]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,6 @@ private[sql] class JSONRelation(
jsonSchema
}

override private[sql] def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
refresh()
super.buildScan(requiredColumns, filters, inputPaths, broadcastedConf)
}

override def buildScan(
requiredColumns: Array[String],
filters: Array[Filter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
filters: Array[Filter],
inputPaths: Array[String],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
refresh()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng seems refresh the file status is unavoidable. let's do that right before getting the input files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. Basically it's impossible to

  1. Create a temporary JSON table pointing to path P
  2. Change the contents by arbitrary means without notifying the temporary table
  3. Read the table again and expect to get updated contents

In the old JSON relation implementation, the refreshing logic is done by TextInputFormat.listStatus(), while the new JSONRelation relies on HadoopFsRelation. We can use SqlNewHadoopRDD and override the input format there to inject the FileStatus cache to avoid extra refreshing costs there. Similar to what we did in ParquetRelation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we'd better provide our own InputFormat, at least we minimize the refreshed dir. But it probably requires lots of code change, can we do that in a separated PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we also need to refresh the partition directory before pruning the partition, probably we need to think more further how to fix that also. In the following PR(s).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll probably work on this later this week, it can be relatively tricky to handle...

val inputStatuses = inputPaths.flatMap { input =>
val path = new Path(input)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,18 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a * 2, b FROM jt
""".stripMargin)

// jsonTable should be recached.
assertCached(sql("SELECT * FROM jsonTable"))
// TODO we need to invalidate the cached data in InsertIntoHadoopFsRelation
// // The cached data is the new data.
// checkAnswer(
// sql("SELECT a, b FROM jsonTable"),
// sql("SELECT a * 2, b FROM jt").collect())
//
// // Verify uncaching
// caseInsensitiveContext.uncacheTable("jsonTable")
// assertCached(sql("SELECT * FROM jsonTable"), 0)

// The cached data is the new data.
checkAnswer(
sql("SELECT a, b FROM jsonTable"),
sql("SELECT a * 2, b FROM jt").collect())

// Verify uncaching
caseInsensitiveContext.uncacheTable("jsonTable")
assertCached(sql("SELECT * FROM jsonTable"), 0)
}

test("it's not allowed to insert into a relation that is not an InsertableRelation") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
StructField("b", StringType, nullable = false)))

lazy val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b")
lazy val testDF2 = (5 to 8).map(i => (i, s"val_$i")).toDF("a", "b")

lazy val partitionedTestDF1 = (for {
i <- 1 to 3
Expand Down Expand Up @@ -269,6 +270,30 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
}
}

test("invalidate the cached table - non-partitioned table") {
withTempPath { file =>
withTempTable("temp_datasource") {
sql(
s"""
|CREATE TEMPORARY TABLE temp_datasource (a int, b string)
|USING $dataSourceName
|OPTIONS (
| path '${file.toString}'
|)
""".stripMargin)

testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).save(file.toString)
checkAnswer(sqlContext.table("temp_datasource"), testDF.orderBy("a").collect())

sqlContext.cacheTable("temp_datasource")
checkAnswer(sqlContext.table("temp_datasource"), testDF.orderBy("a").collect())

testDF2.write.format(dataSourceName).mode(SaveMode.Overwrite).save(file.toString)
checkAnswer(sqlContext.table("temp_datasource"), testDF2.orderBy("a").collect())
}
}
}

test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") {
Seq.empty[(Int, String)].toDF().registerTempTable("t")

Expand Down