From 4da7a22d2195c77e27aa4f3aa957b1fdc0d57f5b Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Thu, 21 Jan 2016 10:39:09 +0530 Subject: [PATCH] SPARK-12948. [SQL]. Consider reducing size of broadcasts in OrcRelation --- .../scala/org/apache/spark/SparkContext.scala | 23 ++++++ .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/sql/hive/orc/OrcRelation.scala | 74 +++++++++++-------- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 77acb7052ddf..564af41dea49 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -967,6 +967,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) } + /** + * Get an RDD for a Hadoop-readable dataset from the Hadoop JobConf. + * + * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. + * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf + * that HadoopRDD creates. + * @param inputFormatClass Class of the InputFormat + * @param keyClass Class of the keys + * @param valueClass Class of the values + * @param minPartitions Minimum number of Hadoop Splits to generate. + */ + def hadoopRDD[K, V]( + broadcastedConf: Broadcast[SerializableConfiguration], + initLocalJobConfFuncOpt: Option[JobConf => Unit], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { + assertNotStopped() + new HadoopRDD(this, broadcastedConf, initLocalJobConfFuncOpt, + inputFormatClass, keyClass, valueClass, minPartitions) + } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6b01a10fc136..1208f986531d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1006,7 +1006,7 @@ class DAGScheduler( case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } - + logDebug(s"Size of broadcasted task binary: ${taskBinaryBytes.length}") taskBinary = sc.broadcast(taskBinaryBytes) } catch { // In the case of a failure during serialization, abort the stage. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 40409169b095..abc7e95b5f8a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -27,9 +27,8 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} +import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.Logging import org.apache.spark.broadcast.Broadcast @@ -42,6 +41,7 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.Utils private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister { @@ -207,7 +207,9 @@ private[sql] class OrcRelation( inputPaths: Array[FileStatus], broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes - OrcTableScan(output, this, filters, inputPaths).execute() + Utils.withDummyCallSite(sqlContext.sparkContext) { + OrcTableScan(output, this, filters, inputPaths, broadcastedConf).execute() + } } override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = { @@ -237,21 +239,13 @@ private[orc] case class OrcTableScan( attributes: Seq[Attribute], @transient relation: OrcRelation, filters: Array[Filter], - @transient inputPaths: Array[FileStatus]) + @transient inputPaths: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration]) extends Logging with HiveInspectors { @transient private val sqlContext = relation.sqlContext - private def addColumnIds( - output: Seq[Attribute], - relation: OrcRelation, - conf: Configuration): Unit = { - val ids = output.map(a => relation.dataSchema.fieldIndex(a.name): Integer) - val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIds, sortedNames) - } - // Transform all given raw `Writable`s into `InternalRow`s. private def fillObject( path: String, @@ -293,42 +287,37 @@ private[orc] case class OrcTableScan( } def execute(): RDD[InternalRow] = { - val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) - val conf = job.getConfiguration - - // Tries to push down filters if ORC filter push-down is enabled - if (sqlContext.conf.orcFilterPushDown) { - OrcFilters.createFilter(filters).foreach { f => - conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) - conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) - } - } - - // Sets requested columns - addColumnIds(attributes, relation, conf) if (inputPaths.isEmpty) { // the input path probably be pruned, return an empty RDD. return sqlContext.sparkContext.emptyRDD[InternalRow] } - FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) + + val ids = attributes.map(a => relation.dataSchema.fieldIndex(a.name): Integer) + val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip + + // Get the paths as fileStatus is not serializable + val setInputPaths = + OrcTableScan.setupConfigs(inputPaths.map(_.getPath.toString), + sortedIds, sortedNames, sqlContext.conf.orcFilterPushDown, filters) _ + val inputFormatClass = classOf[OrcInputFormat] .asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]] val rdd = sqlContext.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], + broadcastedConf, + Some(setInputPaths), inputFormatClass, classOf[NullWritable], classOf[Writable] ).asInstanceOf[HadoopRDD[NullWritable, Writable]] - val wrappedConf = new SerializableConfiguration(conf) - rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => val writableIterator = iterator.map(_._2) - fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes) + fillObject(split.getPath.toString, broadcastedConf.value.value, + writableIterator, attributes) } } } @@ -336,4 +325,27 @@ private[orc] case class OrcTableScan( private[orc] object OrcTableScan { // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. private[orc] val SARG_PUSHDOWN = "sarg.pushdown" + + private[orc] def setupConfigs( + inputFiles: Array[String], + ids: Seq[Integer], + names: Seq[String], + filterPushDown: Boolean, + orcFilters: Array[Filter])(job: JobConf): Unit = { + + HiveShim.appendReadColumns(job, ids, names) + + if (filterPushDown) { + OrcFilters.createFilter(orcFilters).foreach { f => + job.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) + job.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) + } + } + + if (inputFiles.nonEmpty) { + // Set up the input paths + val inputPaths = inputFiles.map(i => new Path(i)) + FileInputFormat.setInputPaths(job, inputPaths: _*) + } + } }