Skip to content

Commit 10f7e3e

Browse files
author
Andrew Or
committed
Avoid getting call sites and cleaning closures
1 parent 17e2943 commit 10f7e3e

File tree

2 files changed

+50
-52
lines changed

2 files changed

+50
-52
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@ import parquet.hadoop._
3333
import parquet.hadoop.metadata.CompressionCodecName
3434
import parquet.hadoop.util.ContextUtil
3535

36+
import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException}
3637
import org.apache.spark.broadcast.Broadcast
3738
import org.apache.spark.deploy.SparkHadoopUtil
3839
import org.apache.spark.rdd.RDD._
3940
import org.apache.spark.rdd.RDD
4041
import org.apache.spark.sql.sources._
4142
import org.apache.spark.sql.types.{DataType, StructType}
4243
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
43-
import org.apache.spark.{Partition => SparkPartition, SparkEnv, SerializableWritable, Logging, SparkException}
44+
import org.apache.spark.util.Utils
4445

4546
private[sql] class DefaultSource extends HadoopFsRelationProvider {
4647
override def createRelation(
@@ -252,57 +253,58 @@ private[sql] class ParquetRelation2(
252253

253254
val footers = inputFiles.map(f => metadataCache.footers(f.getPath))
254255

255-
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
256-
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
257-
// footers. Especially when a global arbitrative schema (either from metastore or data source
258-
// DDL) is available.
259-
new SqlNewHadoopRDD(
260-
sc = sqlContext.sparkContext,
261-
broadcastedConf = broadcastedConf,
262-
initDriverSideJobFuncOpt = Some(setInputPaths),
263-
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
264-
inputFormatClass = classOf[FilteringParquetRowInputFormat],
265-
keyClass = classOf[Void],
266-
valueClass = classOf[Row]) {
267-
268-
val cacheMetadata = useMetadataCache
269-
270-
@transient val cachedStatuses = inputFiles.map { f =>
271-
// In order to encode the authority of a Path containing special characters such as /,
272-
// we need to use the string returned by the URI of the path to create a new Path.
273-
val pathWithAuthority = new Path(f.getPath.toUri.toString)
274-
275-
new FileStatus(
276-
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
277-
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
278-
}.toSeq
279-
280-
@transient val cachedFooters = footers.map { f =>
281-
// In order to encode the authority of a Path containing special characters such as /,
282-
// we need to use the string returned by the URI of the path to create a new Path.
283-
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
284-
}.toSeq
285-
286-
// Overridden so we can inject our own cached files statuses.
287-
override def getPartitions: Array[SparkPartition] = {
288-
val inputFormat = if (cacheMetadata) {
289-
new FilteringParquetRowInputFormat {
290-
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
291-
292-
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
256+
Utils.withDummyCallSite(sqlContext.sparkContext) {
257+
// TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`.
258+
// After upgrading to Parquet 1.6.0, we should be able to stop caching `FileStatus` objects and
259+
// footers. Especially when a global arbitrative schema (either from metastore or data source
260+
// DDL) is available.
261+
new SqlNewHadoopRDD(
262+
sc = sqlContext.sparkContext,
263+
broadcastedConf = broadcastedConf,
264+
initDriverSideJobFuncOpt = Some(setInputPaths),
265+
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
266+
inputFormatClass = classOf[FilteringParquetRowInputFormat],
267+
keyClass = classOf[Void],
268+
valueClass = classOf[Row]) {
269+
270+
val cacheMetadata = useMetadataCache
271+
272+
@transient val cachedStatuses = inputFiles.map { f =>
273+
// In order to encode the authority of a Path containing special characters such as /,
274+
// we need to use the string returned by the URI of the path to create a new Path.
275+
val pathWithAuthority = new Path(f.getPath.toUri.toString)
276+
277+
new FileStatus(
278+
f.getLen, f.isDir, f.getReplication, f.getBlockSize, f.getModificationTime,
279+
f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, pathWithAuthority)
280+
}.toSeq
281+
282+
@transient val cachedFooters = footers.map { f =>
283+
// In order to encode the authority of a Path containing special characters such as /,
284+
// we need to use the string returned by the URI of the path to create a new Path.
285+
new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
286+
}.toSeq
287+
288+
// Overridden so we can inject our own cached files statuses.
289+
override def getPartitions: Array[SparkPartition] = {
290+
val inputFormat = if (cacheMetadata) {
291+
new FilteringParquetRowInputFormat {
292+
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatuses
293+
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
294+
}
295+
} else {
296+
new FilteringParquetRowInputFormat
293297
}
294-
} else {
295-
new FilteringParquetRowInputFormat
296-
}
297298

298-
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
299-
val rawSplits = inputFormat.getSplits(jobContext)
299+
val jobContext = newJobContext(getConf(isDriverSide = true), jobId)
300+
val rawSplits = inputFormat.getSplits(jobContext)
300301

301-
Array.tabulate[SparkPartition](rawSplits.size) { i =>
302-
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
302+
Array.tabulate[SparkPartition](rawSplits.size) { i =>
303+
new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
304+
}
303305
}
304-
}
305-
}.values
306+
}.values
307+
}
306308
}
307309

308310
private class MetadataCache {

sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,6 @@ private[sql] class SqlNewHadoopRDD[K, V](
7575
with SparkHadoopMapReduceUtil
7676
with Logging {
7777

78-
if (initLocalJobFuncOpt.isDefined) {
79-
sc.clean(initLocalJobFuncOpt.get)
80-
}
81-
8278
protected def getJob(): Job = {
8379
val conf: Configuration = broadcastedConf.value.value
8480
// "new Job" will make a copy of the conf. Then, it is

0 commit comments

Comments
 (0)