Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions)
}

/**
* Get an RDD for a Hadoop-readable dataset from the Hadoop JobConf.
*
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf
* that HadoopRDD creates.
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minPartitions Minimum number of Hadoop Splits to generate.
*/
def hadoopRDD[K, V](
broadcastedConf: Broadcast[SerializableConfiguration],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to let users share the broadcast of the conf across multiple hadoopRDD calls (e.g. when unioning many HadoopRDDs together)? If so, this issue has come up a number of times in the past and may be worth a holistic design review because I think there are some hacks in Spark SQL to address this problem there and it would be nice to have a unified solution for this.

initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
new HadoopRDD(this, broadcastedConf, initLocalJobConfFuncOpt,
inputFormatClass, keyClass, valueClass, minPartitions)
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ class DAGScheduler(
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

logDebug(s"Size of broadcasted task binary: ${taskBinaryBytes.length}")
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
Expand All @@ -42,6 +41,7 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.Utils

private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {

Expand Down Expand Up @@ -207,7 +207,9 @@ private[sql] class OrcRelation(
inputPaths: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
OrcTableScan(output, this, filters, inputPaths).execute()
Utils.withDummyCallSite(sqlContext.sparkContext) {
OrcTableScan(output, this, filters, inputPaths, broadcastedConf).execute()
}
}

override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
Expand Down Expand Up @@ -237,21 +239,13 @@ private[orc] case class OrcTableScan(
attributes: Seq[Attribute],
@transient relation: OrcRelation,
filters: Array[Filter],
@transient inputPaths: Array[FileStatus])
@transient inputPaths: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration])
extends Logging
with HiveInspectors {

@transient private val sqlContext = relation.sqlContext

private def addColumnIds(
output: Seq[Attribute],
relation: OrcRelation,
conf: Configuration): Unit = {
val ids = output.map(a => relation.dataSchema.fieldIndex(a.name): Integer)
val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
HiveShim.appendReadColumns(conf, sortedIds, sortedNames)
}

// Transform all given raw `Writable`s into `InternalRow`s.
private def fillObject(
path: String,
Expand Down Expand Up @@ -293,47 +287,65 @@ private[orc] case class OrcTableScan(
}

def execute(): RDD[InternalRow] = {
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration

// Tries to push down filters if ORC filter push-down is enabled
if (sqlContext.conf.orcFilterPushDown) {
OrcFilters.createFilter(filters).foreach { f =>
conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}

// Sets requested columns
addColumnIds(attributes, relation, conf)

if (inputPaths.isEmpty) {
// the input path probably be pruned, return an empty RDD.
return sqlContext.sparkContext.emptyRDD[InternalRow]
}
FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)

val ids = attributes.map(a => relation.dataSchema.fieldIndex(a.name): Integer)
val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip

// Get the paths as fileStatus is not serializable
val setInputPaths =
OrcTableScan.setupConfigs(inputPaths.map(_.getPath.toString),
sortedIds, sortedNames, sqlContext.conf.orcFilterPushDown, filters) _


val inputFormatClass =
classOf[OrcInputFormat]
.asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]]

val rdd = sqlContext.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf],
broadcastedConf,
Some(setInputPaths),
inputFormatClass,
classOf[NullWritable],
classOf[Writable]
).asInstanceOf[HadoopRDD[NullWritable, Writable]]

val wrappedConf = new SerializableConfiguration(conf)

rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val writableIterator = iterator.map(_._2)
fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
fillObject(split.getPath.toString, broadcastedConf.value.value,
writableIterator, attributes)
}
}
}

private[orc] object OrcTableScan {
// This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"

private[orc] def setupConfigs(
inputFiles: Array[String],
ids: Seq[Integer],
names: Seq[String],
filterPushDown: Boolean,
orcFilters: Array[Filter])(job: JobConf): Unit = {

HiveShim.appendReadColumns(job, ids, names)

if (filterPushDown) {
OrcFilters.createFilter(orcFilters).foreach { f =>
job.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
job.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
}
}

if (inputFiles.nonEmpty) {
// Set up the input paths
val inputPaths = inputFiles.map(i => new Path(i))
FileInputFormat.setInputPaths(job, inputPaths: _*)
}
}
}