diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 5500da83b2b6..0799ab17c574 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -605,6 +605,96 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or +## ORC Files + +[ORC](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) is a columnar format that is supported in Hive, it provides a highly efficient way to store data on HDFS to speed up query performance. +Spark SQL provides support for both reading and writing ORC files that automatically preserves the schema +of the original data. + +### Loading Data Programmatically + +Using the data from the above example: + +
+ +
+ +{% highlight scala %} +// Use HiveContext to read or write ORC File. +val sqlContext = new HiveContext(sc) +import sqlContext._ +val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. + +// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using ORC. +rdd.registerTempTable("people") +rdd.saveAsOrcFile("people.orc") + +// Read in the ORC file created above. ORC files are self-describing so the schema is preserved. +// The result of loading a ORC file is also a SchemaRDD. +val orcFile = hiveContext.orcFile("pair.orc") + +//ORC files can also be registered as tables and then used in SQL statements. +orcFile.registerTempTable("orcFile") +val teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19") +teenagers.map(t => "Name: " + t(0)).collect().foreach(println) +{% endhighlight %} + +
+ +
+ +{% highlight java %} +// Use JavaHiveContext to read or write ORC File. +JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc); +JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example. + +// JavaSchemaRDDs can be saved as ORC files, maintaining the schema information. +schemaPeople.saveAsOrcFile("people.orc"); + +// Read in the ORC file created above. ORC files are self-describing so the schema is preserved. +// The result of loading a ORC file is also a JavaSchemaRDD. +JavaSchemaRDD orcFile = sqlContext.orcFile("people.orc"); + +// ORC files can also be registered as tables and then used in SQL statements. +orcFile.registerTempTable("orcFile"); +JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19"); +List teenagerNames = teenagers.map(new Function() { + public String call(Row row) { + return "Name: " + row.getString(0); + } +}).collect(); +{% endhighlight %} + +
+ +
+ +{% highlight python %} +# Use HiveContext to read or write ORC File. +from pyspark.sql import HiveContext +sqlContext = HiveContext(sc) + +schemaPeople # The SchemaRDD from the previous example. + +# SchemaRDDs can be saved as ORC files, maintaining the schema information. +schemaPeople.saveAsOrcFile("people.orc") + +# Read in the ORC file created above. ORC files are self-describing so the schema is preserved. +# The result of loading a ORC file is also a SchemaRDD. +orcFile = sqlContext.orcFile("people.orc") + +# ORC files can also be registered as tables and then used in SQL statements. +orcFile.registerTempTable("orcFile"); +teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19") +teenNames = teenagers.map(lambda p: "Name: " + p.name) +for teenName in teenNames.collect(): + print teenName +{% endhighlight %} + +
+ +
+ ## JSON Datasets
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 31dc5a58e68e..2ad960afb9e7 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -284,7 +284,7 @@ package object dsl { object plans { // scalastyle:ignore implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) extends LogicalPlanFunctions { - def writeToFile(path: String) = WriteToFile(path, logicalPlan) + def writeToFile(path: String) = WriteToPaquetFile(path, logicalPlan) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 00bdf108a839..a749a2142732 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -124,7 +124,7 @@ case class CreateTableAsSelect[T]( override lazy val resolved = (databaseName != None && childrenResolved) } -case class WriteToFile( +case class WriteToPaquetFile( path: String, child: LogicalPlan) extends UnaryNode { override def output = child.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala index fd5f4abcbcd6..25ea91b5f384 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike { @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match { // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. - case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile => + case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToPaquetFile => LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext) case _ => baseLogicalPlan @@ -73,7 +73,7 @@ private[sql] trait SchemaRDDLike { * @group schema */ def saveAsParquetFile(path: String): Unit = { - sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd + sqlContext.executePlan(WriteToPaquetFile(path, logicalPlan)).toRdd } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1225d18857af..a062d6659907 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -198,8 +198,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object ParquetOperations extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // TODO: need to support writing to other types of files. Unify the below code paths. - case logical.WriteToFile(path, child) => + case logical.WriteToPaquetFile(path, child) => val relation = ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext) // Note: overwrite=false because otherwise the metadata we just created will be deleted diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 0e36852ddd9b..01b7104a669f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -29,7 +29,7 @@ import scala.util.Try import com.google.common.cache.CacheBuilder import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path} +import org.apache.hadoop.fs.{FileSystem, BlockLocation, FileStatus, Path} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat => NewFileOutputFormat} @@ -287,7 +287,8 @@ case class InsertIntoParquetTable( 1 } else { FileSystemHelper - .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1 + .findMaxTaskId( + NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "parquet") + 1 } def writeShard(context: TaskContext, iter: Iterator[Row]): Int = { @@ -381,6 +382,26 @@ private[parquet] class FilteringParquetRowInputFormat } } + override def listStatus(jobContext: JobContext): JList[FileStatus] = { + val conf = ContextUtil.getConfiguration(jobContext) + val paths = NewFileInputFormat.getInputPaths(jobContext) + if (paths.isEmpty) { return new ArrayList[FileStatus]() } + val fs = paths.head.getFileSystem(conf) + val statuses = new ArrayList[FileStatus] + paths.foreach { p => + val cached = FilteringParquetRowInputFormat.statusCache.getIfPresent(p) + if (cached == null) { + logWarning(s"Status cache miss for $p") + val res = fs.listStatus(p).filterNot(_.getPath.getName.startsWith("_")) + res.foreach(statuses.add) + FilteringParquetRowInputFormat.statusCache.put(p, res) + } else { + cached.foreach(statuses.add) + } + } + statuses + } + override def getFooters(jobContext: JobContext): JList[Footer] = { import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache @@ -590,6 +611,10 @@ private[parquet] class FilteringParquetRowInputFormat } private[parquet] object FilteringParquetRowInputFormat { + val statusCache = CacheBuilder.newBuilder() + .maximumSize(20000) + .build[Path, Array[FileStatus]]() + private val footerCache = CacheBuilder.newBuilder() .maximumSize(20000) .build[FileStatus, Footer]() @@ -600,7 +625,7 @@ private[parquet] object FilteringParquetRowInputFormat { .build[FileStatus, Array[BlockLocation]]() } -private[parquet] object FileSystemHelper { +private[sql] object FileSystemHelper { def listFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) @@ -616,19 +641,37 @@ private[parquet] object FileSystemHelper { fs.listStatus(path).map(_.getPath) } - /** - * Finds the maximum taskid in the output file names at the given path. - */ - def findMaxTaskId(pathStr: String, conf: Configuration): Int = { + /** + * List files with special extension + */ + def listFiles(origPath: Path, conf: Configuration, extension: String): Seq[Path] = { + val fs = origPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException( + s"Path $origPath is incorrectly formatted") + } + val path = origPath.makeQualified(fs) + if (fs.exists(path)) { + fs.listStatus(path).map(_.getPath).filter(p => p.getName.endsWith(extension)) + } else { + Seq.empty + } + } + + /** + * Finds the maximum taskid in the output file names at the given path. + */ + def findMaxTaskId(pathStr: String, conf: Configuration, extension: String): Int = { + // filename pattern is part-r-.$extension + require(Seq("orc", "parquet").contains(extension), s"Unsupported extension: $extension") + val nameP = new scala.util.matching.Regex(s"""part-r-(\\d{1,}).$extension""", "taskid") val files = FileSystemHelper.listFiles(pathStr, conf) - // filename pattern is part-r-.parquet - val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid") val hiddenFileP = new scala.util.matching.Regex("_.*") files.map(_.getName).map { case nameP(taskid) => taskid.toInt case hiddenFileP() => 0 case other: String => - sys.error("ERROR: attempting to append to set of Parquet files and found file" + + sys.error(s"ERROR: attempting to append to set of $extension files and found file" + s"that does not match name pattern: $other") case _ => 0 }.reduceLeft((a, b) => if (a < b) b else a) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 304b9a73ee91..daa43e35fd68 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -22,7 +22,8 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConversions._ import scala.language.implicitConversions -import scala.reflect.runtime.universe.TypeTag +import scala.reflect.runtime.universe.{TypeTag, typeTag} +import scala.Some import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf @@ -33,13 +34,18 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hive.orc.OrcSchemaRDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution._ import org.apache.spark.sql.catalyst.types.DecimalType import org.apache.spark.sql.catalyst.types.decimal.Decimal -import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand} +import org.apache.spark.sql.execution.{Command => PhysicalCommand} +import org.apache.spark.sql.catalyst.plans.logical.SetCommand +import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand import org.apache.spark.sql.sources.DataSourceStrategy @@ -105,6 +111,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1") def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery) + /** + * Creates a SchemaRDD from an RDD of case classes. + * + * @group userf + */ + implicit override def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = { + SparkPlan.currentContext.set(self) + val attributeSeq = ScalaReflection.attributesFor[A] + val schema = StructType.fromAttributes(attributeSeq) + val rowRDD = RDDConversions.productToRowRdd(rdd, schema) + new OrcSchemaRDD(this, + LogicalRDD(ScalaReflection.attributesFor[A], rowRDD)(self)) + } + /** * Creates a table using the schema of the given class. * @@ -116,6 +136,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } + /** + * Loads a ORC file, returning the result as a [[SchemaRDD]]. + * + * @group userf + */ + def orcFile(path: String): SchemaRDD = new SchemaRDD( + this, orc.OrcRelation(Seq.empty, path, Some(sparkContext.hadoopConfiguration), this)) + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. @@ -344,6 +372,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { HiveCommandStrategy(self), TakeOrdered, ParquetOperations, + OrcOperations, InMemoryScans, ParquetConversion, // Must be before HiveTableScans HiveTableScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 56fc85239e1c..90473a5dc9f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,19 +17,22 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.parse.ASTNode +import org.apache.hadoop.mapreduce.Job import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.types.StringType import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql.parquet.ParquetRelation import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy} @@ -232,4 +235,52 @@ private[hive] trait HiveStrategies { case _ => Nil } } + + object OrcOperations extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case WriteToOrcFile(path, child) => + val relation = + OrcRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext) + InsertIntoOrcTable(relation, planLater(child), overwrite = true) :: Nil + case logical.InsertIntoTable(table: OrcRelation, partition, child, overwrite) => + InsertIntoOrcTable(table, planLater(child), overwrite) :: Nil + case PhysicalOperation(projectList, filters, relation: OrcRelation) => + val prunePushedDownFilters = { + OrcRelation.jobConf = sparkContext.hadoopConfiguration + if (ORC_FILTER_PUSHDOWN_ENABLED) { + val job = new Job(OrcRelation.jobConf) + val conf: Configuration = job.getConfiguration + logInfo("Orc push down filter enabled:" + filters) + (filters: Seq[Expression]) => { + val recordFilter = OrcFilters.createFilter(filters) + if (recordFilter.isDefined) { + + logInfo("Parsed filters:" + recordFilter) + /** + * To test it, we can set follows so that the reader + * will not read whole file if small + * sparkContext.hadoopConfiguration.setInt( + * "mapreduce.input.fileinputformat.split.maxsize", 50) + */ + conf.set(SARG_PUSHDOWN, toKryo(recordFilter.get)) + conf.setBoolean("hive.optimize.index.filter", true) + OrcRelation.jobConf = conf + } + // no matter whether it is filtered or not in orc, + // we need to do more fine grained filter + // in the upper layer, return all of them + filters + } + } else { + identity[Seq[Expression]] _ + } + } + pruneFilterProject( + projectList, + filters, + prunePushedDownFilters, + OrcTableScan(_, relation, None)) :: Nil + case _ => Nil + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala new file mode 100644 index 000000000000..faaa00c59a02 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder +import org.apache.spark.Logging + +private[sql] object OrcFilters extends Logging { + + def createFilter(expr: Seq[Expression]): Option[SearchArgument] = { + if (expr == null || expr.size == 0) return None + var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder()) + sarg.get.startAnd() + expr.foreach { + x => { + sarg match { + case Some(s1) => sarg = createFilter(x, s1) + case _ => None + } + } + } + sarg match { + case Some(b) => Some(b.end.build) + case _ => None + } + } + + def createFilter(expression: Expression, builder: Builder): Option[Builder] = { + expression match { + case p@And(left: Expression, right: Expression) => { + val b1 = builder.startAnd() + val b2 = createFilter(left, b1) + b2 match { + case Some(b) => val b3 = createFilter(right, b) + if (b3.isDefined) { + Some(b3.get.end) + } else { + None + } + case _ => None + } + } + case p@Or(left: Expression, right: Expression) => { + val b1 = builder.startOr() + val b2 = createFilter(left, b1) + b2 match { + case Some(b) => val b3 = createFilter(right, b) + if (b3.isDefined) { + Some(b3.get.end) + } else { + None + } + case _ => None + } + } + case p@EqualTo(left: Literal, right: NamedExpression) => { + val b1 = builder.equals(right.name, left.value) + Some(b1) + } + case p@EqualTo(left: NamedExpression, right: Literal) => { + val b1 = builder.equals(left.name, right.value) + Some(b1) + } + case p@LessThan(left: NamedExpression, right: Literal) => { + val b1 = builder.lessThan(left.name, right.value) + Some(b1) + } + case p@LessThan(left: Literal, right: NamedExpression) => { + val b1 = builder.startNot().lessThanEquals(right.name, left.value).end() + Some(b1) + } + case p@LessThanOrEqual(left: NamedExpression, right: Literal) => { + val b1 = builder.lessThanEquals(left.name, right.value) + Some(b1) + } + case p@LessThanOrEqual(left: Literal, right: NamedExpression) => { + val b1 = builder.startNot().lessThan(right.name, left.value).end() + Some(b1) + } + case p@GreaterThan(left: NamedExpression, right: Literal) => { + val b1 = builder.startNot().lessThanEquals(left.name, right.value).end() + Some(b1) + } + case p@GreaterThan(left: Literal, right: NamedExpression) => { + val b1 = builder.lessThanEquals(right.name, left.value) + Some(b1) + } + case p@GreaterThanOrEqual(left: NamedExpression, right: Literal) => { + val b1 = builder.startNot().lessThan(left.name, right.value).end() + Some(b1) + } + case p@GreaterThanOrEqual(left: Literal, right: NamedExpression) => { + val b1 = builder.lessThan(right.name, left.value) + Some(b1) + } + // TODO: test it + case p@EqualNullSafe(left: NamedExpression, right: NamedExpression) => { + val b1 = builder.nullSafeEquals(left.name, right.name) + Some(b1) + } + case p@In(left: NamedExpression, list: Seq[Literal]) => { + val b1 = builder.in(left.name, list.map(_.value).toArray) + Some(b1) + } + case _ => None + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala new file mode 100644 index 000000000000..73ad0a1cf7ae --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.util.Properties +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector +import org.apache.hadoop.hive.ql.io.orc._ + +import org.apache.spark.sql.catalyst.plans.logical.{Statistics, LogicalPlan, LeafNode} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedException, MultiInstanceRelation} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.hive.HiveMetastoreTypes +import org.apache.spark.sql.parquet.FileSystemHelper +import org.apache.spark.sql.SQLContext + +import scala.collection.JavaConversions._ + +private[sql] case class OrcRelation( + attributes: Seq[Attribute], + path: String, + @transient conf: Option[Configuration], + @transient sqlContext: SQLContext, + partitioningAttributes: Seq[Attribute] = Nil) + extends LeafNode with MultiInstanceRelation { + self: Product => + + val prop: Properties = new Properties + + override lazy val output = attributes ++ OrcFileOperator.orcSchema(path, conf, prop) + + // TODO: use statistics in ORC file + override lazy val statistics = Statistics(sizeInBytes = sqlContext.defaultSizeInBytes) + + override def newInstance() = + OrcRelation(attributes, path, conf, sqlContext).asInstanceOf[this.type] +} + +private[sql] object OrcRelation { + /** + * Creates a new OrcRelation and underlying Orcfile for the given LogicalPlan. Note that + * this is used inside [[org.apache.spark.sql.execution.SparkStrategies]] to + * create a resolved relation as a data sink for writing to a Orcfile. + * + * @param pathString The directory the ORCfile will be stored in. + * @param child The child node that will be used for extracting the schema. + * @param conf A configuration to be used. + * @return An empty OrcRelation with inferred metadata. + */ + def create( + pathString: String, + child: LogicalPlan, + conf: Configuration, + sqlContext: SQLContext): OrcRelation = { + if (!child.resolved) { + throw new UnresolvedException[LogicalPlan]( + child, + "Attempt to create Orc table from unresolved child") + } + val path = checkPath(pathString, false, conf) + new OrcRelation(child.output, path.toString, Some(conf), sqlContext) + } + + private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = { + require(pathStr != null, "Unable to create OrcRelation: path is null") + val origPath = new Path(pathStr) + val fs = origPath.getFileSystem(conf) + require(fs != null, s"Unable to create OrcRelation: incorrectly formatted path $pathStr") + val path = origPath.makeQualified(fs) + if (!allowExisting) { + require(!fs.exists(path), s"File $pathStr already exists.") + } + if (fs.exists(path)) { + require(fs.getFileStatus(path).getPermission.getUserAction.implies(FsAction.READ_WRITE), + s"Unable to create OrcRelation: path $path not read-writable") + } + path + } + var jobConf: Configuration = _ +} + +private[sql] object OrcFileOperator{ + def getMetaDataReader(origPath: Path, configuration: Option[Configuration]): Reader = { + val conf = configuration.getOrElse(new Configuration()) + val fs: FileSystem = origPath.getFileSystem(conf) + val orcFiles = FileSystemHelper.listFiles(origPath, conf, ".orc") + if (orcFiles == Seq.empty) { + // should return null when write to orc file + return null + } + OrcFile.createReader(fs, orcFiles(0)) + } + + def orcSchema( + path: String, + conf: Option[Configuration], + prop: Properties): Seq[Attribute] = { + // get the schema info through ORC Reader + val origPath = new Path(path) + val reader = getMetaDataReader(origPath, conf) + if (reader == null) { + // return empty seq when saveAsOrcFile + return Seq.empty + } + val inspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] + // data types that is inspected by this inspector + val schema = inspector.getTypeName + // set prop here, initial OrcSerde need it + val fields = inspector.getAllStructFieldRefs + val (columns, columnTypes) = fields.map { f => + f.getFieldName -> f.getFieldObjectInspector.getTypeName + }.unzip + prop.setProperty("columns", columns.mkString(",")) + prop.setProperty("columns.types", columnTypes.mkString(":")) + + HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType].toAttributes + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala new file mode 100644 index 000000000000..314b45b4a14c --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.hive.orc + +import java.io.IOException +import java.text.SimpleDateFormat +import java.util.{Locale, Date} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, FileOutputCommitter} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.io.{Writable, NullWritable} +import org.apache.hadoop.mapreduce.{TaskID, TaskAttemptContext, Job} +import org.apache.hadoop.mapred.{Reporter, JobConf} +import org.apache.hadoop.hive.ql.io.orc.{OrcSerde, OrcInputFormat, OrcOutputFormat} +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, TypeInfo} + +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.parquet.FileSystemHelper +import org.apache.spark.{Logging, TaskContext, SerializableWritable} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode => LogicalUnaryNode} +import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.execution.UnaryNode +import org.apache.spark.sql.hive.{HiveInspectors, HiveShim, HiveMetastoreTypes, HadoopTableReader} + +import scala.collection.JavaConversions._ +import org.apache.spark.mapred.SparkHadoopMapRedUtil + +/** + * logical plan of writing to ORC file + */ +case class WriteToOrcFile( + path: String, + child: LogicalPlan) extends LogicalUnaryNode { + def output = child.output +} + +/** + * orc table scan operator. Imports the file that backs the given + * [[org.apache.spark.sql.hive.orc.OrcRelation]] as a ``RDD[Row]``. + */ +case class OrcTableScan( + output: Seq[Attribute], + relation: OrcRelation, + columnPruningPred: Option[Expression]) + extends LeafNode { + + @transient + lazy val serde: OrcSerde = initSerde + + private def initSerde(): OrcSerde = { + val serde = new OrcSerde + serde.initialize(null, relation.prop) + serde + } + + override def execute(): RDD[Row] = { + val sc = sqlContext.sparkContext + val job = new Job(OrcRelation.jobConf) // sc.hadoopConfiguration) + val conf: Configuration = job.getConfiguration + relation.path.split(",").foreach { curPath => + val qualifiedPath = { + val path = new Path(curPath) + path.getFileSystem(conf).makeQualified(path) + } + FileInputFormat.addInputPath(job, qualifiedPath) + } + + addColumnIds(output, relation, conf) + val inputClass = classOf[OrcInputFormat].asInstanceOf[ + Class[_ <: org.apache.hadoop.mapred.InputFormat[NullWritable, Writable]]] + + // use SpecificMutableRow to decrease GC garbage + val mutableRow = new SpecificMutableRow(output.map(_.dataType)) + val attrsWithIndex = output.zipWithIndex + val rowRdd = sc.hadoopRDD[NullWritable, Writable](conf.asInstanceOf[JobConf], inputClass, + classOf[NullWritable], classOf[Writable]).map(_._2).mapPartitions { iter => + val deserializer = serde + HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) + } + rowRdd + } + + /** + * add column ids and names + * @param output + * @param relation + * @param conf + */ + def addColumnIds(output: Seq[Attribute], relation: OrcRelation, conf: Configuration) { + + val ids = + output.map(a => + relation.output.indexWhere(_.name == a.name): Integer) + .filter(_ >= 0) + val names = output.map(_.name) + assert(ids.size == names.size, "columns id and name length does not match!") + val sorted = ids.zip(names).sorted + HiveShim.appendReadColumns(conf, sorted.map(_._1), sorted.map(_._2)) + } +} + +/** + * Operator that acts as a sink for queries on RDDs and can be used to + * store the output inside a directory of ORC files. This operator + * is similar to Hive's INSERT INTO TABLE operation in the sense that + * one can choose to either overwrite or append to a directory. Note + * that consecutive insertions to the same table must have compatible + * (source) schemas. + */ +private[sql] case class InsertIntoOrcTable( + relation: OrcRelation, + child: SparkPlan, + overwrite: Boolean = false) + extends UnaryNode with SparkHadoopMapRedUtil with HiveInspectors with Logging { + + override def output = child.output + + @transient val sc = sqlContext.sparkContext + + @transient lazy val orcSerde = initSerde + + private def initSerde(): OrcSerde = { + val serde: OrcSerde = new OrcSerde + serde.initialize(null, relation.prop) + serde + } + + /** + * Inserts all rows into the Orc file. + */ + override def execute() = { + val childRdd = child.execute() + assert(childRdd != null) + + val job = new Job(sqlContext.sparkContext.hadoopConfiguration) + val conf = job.getConfiguration + + val fspath = new Path(relation.path) + val fs = fspath.getFileSystem(conf) + + if (overwrite) { + try { + fs.delete(fspath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${fspath.toString} prior" + + s" to InsertIntoOrcTable:\n${e.toString}") + } + } + val structType = StructType.fromAttributes(relation.output) + // get Type String to build typeInfo + val orcSchema = HiveMetastoreTypes.toMetastoreType(structType) + + val writableRdd = childRdd.mapPartitions { iter => + val typeInfo: TypeInfo = + TypeInfoUtils.getTypeInfoFromTypeString(orcSchema) + val standardOI = TypeInfoUtils + .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) + .asInstanceOf[StructObjectInspector] + val fieldOIs = standardOI + .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + val wrappers = fieldOIs.map(wrapperFor) + iter.map { row => + var i = 0 + while (i < row.length) { + outputData(i) = wrappers(i)(row(i)) + i += 1 + } + orcSerde.serialize(outputData, standardOI) + } + } + + saveAsHadoopFile(writableRdd, relation.path, conf) + + // We return the child RDD to allow chaining (alternatively, one could return nothing). + childRdd + } + + private def saveAsHadoopFile( + rdd: RDD[Writable], + path: String, + @transient conf: Configuration) { + val job = new Job(conf) + val keyType = classOf[Void] + job.setOutputKeyClass(keyType) + job.setOutputValueClass(classOf[Writable]) + FileOutputFormat.setOutputPath(job, new Path(path)) + + val wrappedConf = new SerializableWritable(job.getConfiguration) + + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = sqlContext.sparkContext.newRddId() + + val taskIdOffset = + if (overwrite) { + 1 + } else { + FileSystemHelper + .findMaxTaskId( + FileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "orc") + 1 + } + + def getWriter( + outFormat: OrcOutputFormat, + conf: Configuration, + path: Path, + reporter: Reporter) = { + val fs = path.getFileSystem(conf) + + outFormat.getRecordWriter(fs, conf.asInstanceOf[JobConf], path.toUri.getPath, reporter). + asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]] + } + + def getCommitterAndWriter(offset: Int, context: TaskAttemptContext) = { + val outFormat = new OrcOutputFormat + + val taskId: TaskID = context.getTaskAttemptID.getTaskID + val partition: Int = taskId.getId + val filename = s"part-r-${partition + offset}.orc" + val output: Path = FileOutputFormat.getOutputPath(context) + val committer = new FileOutputCommitter(output, context) + val path = new Path(committer.getWorkPath, filename) + val writer = getWriter(outFormat, wrappedConf.value, path, Reporter.NULL) + (committer, writer) + } + + def writeShard(context: TaskContext, iter: Iterator[Writable]): Int = { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + /* "reduce task" */ + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + attemptNumber) + val hadoopContext = newTaskAttemptContext(wrappedConf.value.asInstanceOf[JobConf], attemptId) + val workerAndComitter = getCommitterAndWriter(taskIdOffset, hadoopContext) + val writer = workerAndComitter._2 + + while (iter.hasNext) { + val row = iter.next() + writer.write(NullWritable.get(), row) + } + + writer.close(Reporter.NULL) + workerAndComitter._1.commitTask(hadoopContext) + return 1 + } + + /* apparently we need a TaskAttemptID to construct an OutputCommitter; + * however we're only going to use this local OutputCommitter for + * setupJob/commitJob, so we just use a dummy "map" task. + */ + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) + val jobTaskContext = newTaskAttemptContext( + wrappedConf.value.asInstanceOf[JobConf], jobAttemptId) + val workerAndComitter = getCommitterAndWriter(taskIdOffset, jobTaskContext) + workerAndComitter._1.setupJob(jobTaskContext) + sc.runJob(rdd, writeShard _) + workerAndComitter._1.commitJob(jobTaskContext) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/orc.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/orc.scala new file mode 100644 index 000000000000..ee0d5dc08973 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/orc.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.util.{Locale, Properties} +import scala.collection.JavaConversions._ + +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.hadoop.io.{NullWritable, Writable} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileStatus, FileSystem} +import org.apache.hadoop.hive.ql.io.orc.Reader +import org.apache.hadoop.hive.ql.io.orc.{OrcFile, OrcInputFormat, OrcSerde} +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector + +import org.apache.spark.sql.sources.{CatalystScan, BaseRelation, RelationProvider} +import org.apache.spark.sql.SQLContext +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions.{SpecificMutableRow, Attribute, Expression, Row} +import org.apache.spark.sql.hive.{HiveShim, HadoopTableReader, HiveMetastoreTypes} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.parquet.FileSystemHelper + +/** + * Allows creation of orc based tables using the syntax + * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.orc`. + * Currently the only option required is `path`, which should be the location of a collection of, + * optionally partitioned, parquet files. + */ +class DefaultSource extends RelationProvider { + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + val path = + parameters.getOrElse("path", sys.error("'path' must be specified for orc tables.")) + + OrcRelation2(path)(sqlContext) + } +} +private[orc] case class Partition(partitionValues: Map[String, Any], files: Seq[FileStatus]) + +/** + * Partitioning: Partitions are auto discovered and must be in the form of directories `key=value/` + * located at `path`. Currently only a single partitioning column is supported and it must + * be an integer. This class supports both fully self-describing data, which contains the partition + * key, and data where the partition key is only present in the folder structure. The presence + * of the partitioning key in the data is also auto-detected. The `null` partition is not yet + * supported. + * + * Metadata: The metadata is automatically discovered by reading the first orc file present. + * There is currently no support for working with files that have different schema. Additionally, + * when parquet metadata caching is turned on, the FileStatus objects for all data will be cached + * to improve the speed of interactive querying. When data is added to a table it must be dropped + * and recreated to pick up any changes. + * + * Statistics: Statistics for the size of the table are automatically populated during metadata + * discovery. + */ +@DeveloperApi +case class OrcRelation2(path: String)(@transient val sqlContext: SQLContext) + extends CatalystScan with Logging { + + @transient + lazy val serde: OrcSerde = initSerde + + val prop: Properties = new Properties + + private def initSerde(): OrcSerde = { + val serde = new OrcSerde + serde.initialize(null, prop) + serde + } + + def sparkContext = sqlContext.sparkContext + + // Minor Hack: scala doesnt seem to respect @transient for vals declared via extraction + @transient + private var partitionKeys: Seq[String] = _ + @transient + private var partitions: Seq[Partition] = _ + discoverPartitions() + + // TODO: Only finds the first partition, assumes the key is of type Integer... + // Why key type is Int? + private def discoverPartitions() = { + val fs = FileSystem.get(new java.net.URI(path), sparkContext.hadoopConfiguration) + val partValue = "([^=]+)=([^=]+)".r + + val childrenOfPath = fs.listStatus(new Path(path)) + .filterNot(_.getPath.getName.startsWith("_")) + .filterNot(_.getPath.getName.startsWith(".")) + + val childDirs = childrenOfPath.filter(s => s.isDir) + + if (childDirs.size > 0) { + val partitionPairs = childDirs.map(_.getPath.getName).map { + case partValue(key, value) => (key, value) + } + + val foundKeys = partitionPairs.map(_._1).distinct + if (foundKeys.size > 1) { + sys.error(s"Too many distinct partition keys: $foundKeys") + } + + // Do a parallel lookup of partition metadata. + val partitionFiles = + childDirs.par.map { d => + fs.listStatus(d.getPath) + // TODO: Is there a standard hadoop function for this? + .filterNot(_.getPath.getName.startsWith("_")) + .filterNot(_.getPath.getName.startsWith(".")) + }.seq + + partitionKeys = foundKeys.toSeq + partitions = partitionFiles.zip(partitionPairs).map { case (files, (key, value)) => + Partition(Map(key -> value.toInt), files) + }.toSeq + } else { + partitionKeys = Nil + partitions = Partition(Map.empty, childrenOfPath) :: Nil + } + } + + override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum + + private def getMetaDataReader(origPath: Path, configuration: Option[Configuration]): Reader = { + val conf = configuration.getOrElse(new Configuration()) + val fs: FileSystem = origPath.getFileSystem(conf) + val orcFiles = FileSystemHelper.listFiles(origPath, conf, ".orc") + if (orcFiles == Seq.empty) { + // should return null when write to orc file + return null + } + OrcFile.createReader(fs, orcFiles(0)) + } + + private def orcSchema( + path: Path, + conf: Option[Configuration], + prop: Properties): StructType = { + // get the schema info through ORC Reader + val reader = getMetaDataReader(path, conf) + if (reader == null) { + // return empty seq when saveAsOrcFile + return StructType(Seq.empty) + } + val inspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] + // data types that is inspected by this inspector + val schema = inspector.getTypeName + // set prop here, initial OrcSerde need it + val fields = inspector.getAllStructFieldRefs + val (columns, columnTypes) = fields.map { f => + f.getFieldName -> f.getFieldObjectInspector.getTypeName + }.unzip + prop.setProperty("columns", columns.mkString(",")) + prop.setProperty("columns.types", columnTypes.mkString(":")) + + HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType] + } + + val dataSchema = orcSchema( + partitions.head.files.head.getPath, + Some(sparkContext.hadoopConfiguration), + prop) + + val dataIncludesKey = + partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) + + override val schema = + if (dataIncludesKey) { + dataSchema + } else { + StructType(dataSchema.fields :+ StructField(partitionKeys.head, IntegerType)) + } + + override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = { + val sc = sparkContext + val job = new Job(sc.hadoopConfiguration) + val conf: Configuration = job.getConfiguration + // todo: predicates push down + path.split(",").foreach { curPath => + val qualifiedPath = { + val path = new Path(curPath) + path.getFileSystem(conf).makeQualified(path) + } + FileInputFormat.addInputPath(job, qualifiedPath) + } + + addColumnIds(output, schema.toAttributes, conf) + val inputClass = classOf[OrcInputFormat].asInstanceOf[ + Class[_ <: org.apache.hadoop.mapred.InputFormat[NullWritable, Writable]]] + + // use SpecificMutableRow to decrease GC garbage + val mutableRow = new SpecificMutableRow(output.map(_.dataType)) + val attrsWithIndex = output.zipWithIndex + val rowRdd = sc.hadoopRDD[NullWritable, Writable](conf.asInstanceOf[JobConf], inputClass, + classOf[NullWritable], classOf[Writable]).map(_._2).mapPartitions { iter => + val deserializer = serde + HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow) + } + rowRdd + } + + /** + * add column ids and names + * @param output + * @param relationOutput + * @param conf + */ + def addColumnIds(output: Seq[Attribute], relationOutput: Seq[Attribute], conf: Configuration) { + val names = output.map(_.name) + val fieldIdMap = relationOutput.map(_.name).zipWithIndex.toMap + val ids = output.map { att => + val realName = att.name.toLowerCase(Locale.ENGLISH) + fieldIdMap.getOrElse(realName, -1) + }.filter(_ >= 0).map(_.asInstanceOf[Integer]) + + assert(ids.size == names.size, "columns id and name length does not match!") + if (ids != null && !ids.isEmpty) { + HiveShim.appendReadColumns(conf, ids, names) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala new file mode 100644 index 000000000000..ee48b37ff4e4 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.io.Output +import org.apache.commons.codec.binary.Base64 +import org.apache.spark.sql.{SQLContext, SchemaRDD} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} +import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.spark.sql.catalyst.expressions.Row +/* Implicit conversions */ +import scala.collection.JavaConversions._ +package object orc { + class OrcSchemaRDD( + @transient val sqlContext1: SQLContext, + @transient val baseLogicalPlan1: LogicalPlan) + extends SchemaRDD(sqlContext1, baseLogicalPlan1) { + /** + * Saves the contents of this `SchemaRDD` as a ORC file, preserving the schema. Files that + * are written out using this method can be read back in as a SchemaRDD using the `orcFile` + * function. + * Note: you can only use it in HiveContext + * + * @group schema + */ + def saveAsOrcFile(path: String): Unit = { + sqlContext.executePlan(WriteToOrcFile(path, logicalPlan)).toRdd + } + } + + // for orc compression type, only take effect in hive 0.13.1 + val orcDefaultCompressVar = "hive.exec.orc.default.compress" + // for prediction push down in hive-0.13.1, don't enable it + val ORC_FILTER_PUSHDOWN_ENABLED = true + val SARG_PUSHDOWN = "sarg.pushdown" + + def toKryo(input: Any) = { + val out = new Output(4 * 1024, 10 * 1024 * 1024); + new Kryo().writeObject(out, input); + out.close(); + Base64.encodeBase64String(out.toBytes()); + } +} diff --git a/sql/hive/src/test/resources/data/files/orcfiles/_SUCCESS b/sql/hive/src/test/resources/data/files/orcfiles/_SUCCESS new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc b/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc new file mode 100644 index 000000000000..3188e14c1fa4 Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/part-r-1.orc differ diff --git a/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc b/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc new file mode 100644 index 000000000000..525488226e2a Binary files /dev/null and b/sql/hive/src/test/resources/data/files/orcfiles/part-r-2.orc differ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala new file mode 100644 index 000000000000..684252c1960a --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/ORCQuerySuite.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.util.Properties +import java.io.File +import org.scalatest.BeforeAndAfterAll +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.util.Utils +import org.apache.hadoop.hive.ql.io.orc.CompressionKind + +case class TestRDDEntry(key: Int, value: String) + +case class NullReflectData( + intField: java.lang.Integer, + longField: java.lang.Long, + floatField: java.lang.Float, + doubleField: java.lang.Double, + booleanField: java.lang.Boolean) + +case class OptionalReflectData( + intField: Option[Int], + longField: Option[Long], + floatField: Option[Float], + doubleField: Option[Double], + booleanField: Option[Boolean]) + +case class Nested(i: Int, s: String) + +case class Data(array: Seq[Int], nested: Nested) + +case class Contact(name: String, phone: String) + +case class Person(name: String, age: Int, contacts: Seq[Contact]) + +case class AllDataTypes( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean) + +case class AllDataTypesWithNonPrimitiveType( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean, + array: Seq[Int], + arrayContainsNull: Seq[Option[Int]], + map: Map[Int, Long], + mapValueContainsNull: Map[Int, Option[Long]], + data: Data) + +case class BinaryData(binaryData: Array[Byte]) + +class OrcQuerySuite extends QueryTest with BeforeAndAfterAll { + + test("Read/Write All Types") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 255) + val data = sparkContext + .parallelize(range) + .map(x => AllDataTypes( + s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) + + data.saveAsOrcFile(tempDir) + checkAnswer( + TestHive.orcFile(tempDir), + data.toSchemaRDD.collect().toSeq) + + Utils.deleteRecursively(new File(tempDir)) + } + + test("read/write binary data") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 3) + sparkContext.parallelize(range) + .map(x => BinaryData(s"test$x".getBytes("utf8"))).saveAsOrcFile(tempDir) + + TestHive.orcFile(tempDir) + .map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8")) + .collect().toSet == Set("test0", "test1", "test2", "test3") + Utils.deleteRecursively(new File(tempDir)) + } + + test("read/right empty RDD") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 255) + val data = sparkContext + .parallelize(range) + .map(x => AllDataTypes( + s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) + .filter(_.intField > 500) + data.saveAsOrcFile(tempDir) + checkAnswer( + TestHive.orcFile(tempDir), + data.toSchemaRDD.collect().toSeq) + + Utils.deleteRecursively(new File(tempDir)) + } + + test("Read/Write All Types with non-primitive type") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 255) + val data = sparkContext.parallelize(range) + .map(x => AllDataTypesWithNonPrimitiveType( + s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0, + (0 until x), + (0 until x).map(Option(_).filter(_ % 3 == 0)), + (0 until x).map(i => i -> i.toLong).toMap, + (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None), + Data((0 until x), Nested(x, s"$x")))) + data.saveAsOrcFile(tempDir) + + checkAnswer( + TestHive.orcFile(tempDir), + data.toSchemaRDD.collect().toSeq) + Utils.deleteRecursively(new File(tempDir)) + } + + test("Creating case class RDD table") { + sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + .registerTempTable("tmp") + val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0)) + var counter = 1 + rdd.foreach { + // '===' does not like string comparison? + row => { + assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter") + counter = counter + 1 + } + } + } + + test("Simple selection form orc table") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val data = sparkContext.parallelize((1 to 10)) + .map(i => Person(s"name_$i", i, (0 until 2).map{ m=> + Contact(s"contact_$m", s"phone_$m") })) + data.saveAsOrcFile(tempDir) + val f = TestHive.orcFile(tempDir) + f.registerTempTable("tmp") + var rdd = sql("SELECT name FROM tmp where age <= 5") + assert(rdd.count() == 5) + + rdd = sql("SELECT name, contacts FROM tmp where age > 5") + assert(rdd.count() == 5) + + val contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + assert(contacts.count() == 10) + Utils.deleteRecursively(new File(tempDir)) + } + + test("save and load case class RDD with Nones as orc") { + val data = OptionalReflectData(None, None, None, None, None) + val rdd = sparkContext.parallelize(data :: data :: data :: Nil) + val tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.saveAsOrcFile(tempDir) + val readFile = TestHive.orcFile(tempDir) + val rdd_saved = readFile.collect() + assert(rdd_saved(0) === Seq.fill(5)(null)) + Utils.deleteRecursively(new File(tempDir)) + } + + test("Compression options for writing to a Orcfile") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val rdd = TestHive.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + + // test default compression codec, now only support zlib + rdd.saveAsOrcFile(tempDir) + val actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression.name + assert(actualCodec == "ZLIB") + + Utils.deleteRecursively(new File(tempDir)) + } + + test("Get ORC Schema with ORC Reader") { + val path = "src/test/resources/data/files/orcfiles" + val attributes = OrcFileOperator.orcSchema(path, Some(TestHive.sparkContext.hadoopConfiguration), new Properties()) + assert(attributes(0).dataType == StringType) + assert(attributes(1).dataType == IntegerType) + assert(attributes(2).dataType == LongType) + assert(attributes(3).dataType == FloatType) + assert(attributes(4).dataType == DoubleType) + assert(attributes(5).dataType == ShortType) + assert(attributes(6).dataType == ByteType) + assert(attributes(7).dataType == BooleanType) + } + + ignore("Other Compression options for writing to an Orcfile only supported in hive 0.13.1 and above") { + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "SNAPPY") + var tempDir = getTempFilePath("orcTest").getCanonicalPath + val rdd = sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.saveAsOrcFile(tempDir) + var actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression + assert(actualCodec == CompressionKind.SNAPPY) + Utils.deleteRecursively(new File(tempDir)) + + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "NONE") + tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.saveAsOrcFile(tempDir) + actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression + assert(actualCodec == CompressionKind.NONE) + Utils.deleteRecursively(new File(tempDir)) + + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "LZO") + tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.saveAsOrcFile(tempDir) + actualCodec = OrcFileOperator.getMetaDataReader(new Path(tempDir), Some(new Configuration())).getCompression + assert(actualCodec == CompressionKind.LZO) + Utils.deleteRecursively(new File(tempDir)) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala new file mode 100644 index 000000000000..f192b03ad9e7 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.spark.sql.QueryTest +import org.scalatest.BeforeAndAfterAll +import java.io.File +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.catalyst.expressions.Row + +// The data where the partitioning key exists only in the directory structure. +case class OrcData(intField: Int, stringField: String) +// The data that also includes the partitioning key +case class OrcDataWithKey(p: Int, intField: Int, stringField: String) + +/** + * A collection of tests for parquet data with various forms of partitioning. + */ +abstract class OrcTest extends QueryTest with BeforeAndAfterAll { + var partitionedTableDir: File = null + var partitionedTableDirWithKey: File = null + + override def beforeAll(): Unit = { + partitionedTableDir = File.createTempFile("orctests", "sparksql") + partitionedTableDir.delete() + partitionedTableDir.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => OrcData(i, s"part-$p")) + .saveAsOrcFile(partDir.getCanonicalPath) + } + + partitionedTableDirWithKey = File.createTempFile("orctests", "sparksql") + partitionedTableDirWithKey.delete() + partitionedTableDirWithKey.mkdir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKey, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => OrcDataWithKey(p, i, s"part-$p")) + .saveAsOrcFile(partDir.getCanonicalPath) + } + } + + Seq("partitioned_orc", "partitioned_orc_with_key").foreach { table => + test(s"project the partitioning column $table") { + checkAnswer( + sql(s"SELECT p, count(*) FROM $table group by p"), + (1, 10) :: + (2, 10) :: + (3, 10) :: + (4, 10) :: + (5, 10) :: + (6, 10) :: + (7, 10) :: + (8, 10) :: + (9, 10) :: + (10, 10) :: Nil + ) + } + + test(s"project partitioning and non-partitioning columns $table") { + checkAnswer( + sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"), + ("part-1", 1, 10) :: + ("part-2", 2, 10) :: + ("part-3", 3, 10) :: + ("part-4", 4, 10) :: + ("part-5", 5, 10) :: + ("part-6", 6, 10) :: + ("part-7", 7, 10) :: + ("part-8", 8, 10) :: + ("part-9", 9, 10) :: + ("part-10", 10, 10) :: Nil + ) + } + + test(s"simple count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table"), + 100) + } + + test(s"pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"), + 10) + } + + test(s"multi-partition pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"), + 30) + } + + test(s"non-partition predicates $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"), + 30) + } + + test(s"sum $table") { + checkAnswer( + sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"), + 1 + 2 + 3) + } + + test(s"hive udfs $table") { + checkAnswer( + sql(s"SELECT concat(stringField, stringField) FROM $table"), + sql(s"SELECT stringField FROM $table").map { + case Row(s: String) => Row(s + s) + }.collect().toSeq) + } + } + + test("non-part select(*)") { + checkAnswer( + sql("SELECT COUNT(*) FROM normal_orc"), + 10) + } +} + +class OrcSourceSuite extends OrcTest { + override def beforeAll(): Unit = { + super.beforeAll() + + sql( s""" + create temporary table partitioned_orc + USING org.apache.spark.sql.hive.orc + OPTIONS ( + path '${partitionedTableDir.getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table partitioned_orc_with_key + USING org.apache.spark.sql.hive.orc + OPTIONS ( + path '${partitionedTableDirWithKey.getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table normal_orc + USING org.apache.spark.sql.hive.orc + OPTIONS ( + path '${new File(partitionedTableDir, "p=1").getCanonicalPath}' + ) + """) + } +}