Skip to content

Commit 1505af4

Browse files
committed
fix according comments and move orc to hive sub project
1 parent 655b23f commit 1505af4

File tree

14 files changed

+274
-469
lines changed

14 files changed

+274
-469
lines changed

examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,16 @@ object RDDRelation {
5959
// Write out an RDD as a parquet file.
6060
rdd.saveAsParquetFile("pair.parquet")
6161

62-
// Write out an RDD as a orc file.
63-
rdd.saveAsOrcFile("pair.orc")
64-
65-
6662
// Read in parquet file. Parquet files are self-describing so the schmema is preserved.
6763
val parquetFile = sqlContext.parquetFile("pair.parquet")
6864

69-
// Read in orc file. orc files are self-describing so the schmema is preserved.
70-
val orcFile = sqlContext.orcFile("pair.orc")
71-
72-
7365
// Queries can be run using the DSL on parequet files just like the original RDD.
7466
parquetFile.where('key === 1).select('value as 'a).collect().foreach(println)
7567

7668
// These files can also be registered as tables.
7769
parquetFile.registerTempTable("parquetFile")
7870
sql("SELECT * FROM parquetFile").collect().foreach(println)
7971

80-
orcFile.registerTempTable("orcFile")
81-
sql("SELECT * FROM orcFile").collect().foreach(println)
82-
8372
sc.stop()
8473
}
8574
}

examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ object HiveFromSpark {
6262
println("Result of SELECT *:")
6363
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)
6464

65+
// Write out an RDD as a orc file.
66+
rdd.saveAsOrcFile("pair.orc")
67+
68+
// Read in orc file. Orc files are self-describing so the schmema is preserved.
69+
val orcFile = hiveContext.orcFile("pair.orc")
70+
71+
// These files can also be registered as tables.
72+
orcFile.registerTempTable("orcFile")
73+
sql("SELECT * FROM records r JOIN orcFile s ON r.key = s.key").collect().foreach(println)
74+
6575
sc.stop()
6676
}
6777
}

sql/core/pom.xml

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,6 @@
5353
<type>test-jar</type>
5454
<scope>test</scope>
5555
</dependency>
56-
<dependency>
57-
<groupId>org.spark-project.hive</groupId>
58-
<artifactId>hive-exec</artifactId>
59-
<version>${hive.version}</version>
60-
<exclusions>
61-
<exclusion>
62-
<groupId>commons-logging</groupId>
63-
<artifactId>commons-logging</artifactId>
64-
</exclusion>
65-
<exclusion>
66-
<groupId>org.slf4j</groupId>
67-
<artifactId>slf4j-api</artifactId>
68-
</exclusion>
69-
</exclusions>
70-
</dependency>
7156
<dependency>
7257
<groupId>com.twitter</groupId>
7358
<artifactId>parquet-column</artifactId>

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ private[spark] object SQLConf {
3535
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
3636
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
3737
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
38-
val ORC_COMPRESSION = "spark.sql.orc.compression.codec"
3938

4039
// This is only used for the thriftserver
4140
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -84,12 +83,6 @@ private[sql] trait SQLConf {
8483
/** The compression codec for writing to a Parquetfile */
8584
private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy")
8685

87-
/** The compression codec for writing to a Orcfile
88-
* Note: only support zlib now since we use ```OrcOutputFormat.getRecordWriter``` ,which is not
89-
* allowed to configure thr compression kind
90-
*/
91-
private[spark] def orcCompressionCodec: String = getConf(ORC_COMPRESSION, "zlib")
92-
9386
/** The number of rows that will be */
9487
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
9588

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ import org.apache.spark.sql.execution._
3636
import org.apache.spark.sql.execution.SparkStrategies
3737
import org.apache.spark.sql.json._
3838
import org.apache.spark.sql.parquet.ParquetRelation
39-
import org.apache.spark.{Logging, SparkContext}
40-
import org.apache.spark.sql.orc.OrcRelation
39+
import org.apache.spark.SparkContext
4140

4241
/**
4342
* :: AlphaComponent ::
@@ -148,14 +147,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
148147
def parquetFile(path: String): SchemaRDD =
149148
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
150149

151-
/**
152-
* Loads a Orc file, returning the result as a [[SchemaRDD]].
153-
*
154-
* @group userf
155-
*/
156-
def orcFile(path: String): SchemaRDD =
157-
new SchemaRDD(this, orc.OrcRelation(path, Some(sparkContext.hadoopConfiguration), this))
158-
159150
/**
160151
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
161152
* It goes through the entire dataset once to determine the schema.
@@ -255,40 +246,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
255246
path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
256247
}
257248

258-
/**
259-
* :: Experimental ::
260-
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
261-
* This registered table can be used as the target of future `insertInto` operations.
262-
*
263-
* {{{
264-
* val sqlContext = new SQLContext(...)
265-
* import sqlContext._
266-
*
267-
* case class Person(name: String, age: Int)
268-
* createOrcFile[Person]("path/to/file.orc").registerTempTable("people")
269-
* sql("INSERT INTO people SELECT 'michael', 29")
270-
* }}}
271-
*
272-
* @tparam A A case class type that describes the desired schema of the parquet file to be
273-
* created.
274-
* @param path The path where the directory containing parquet metadata should be created.
275-
* Data inserted into this table will also be stored at this location.
276-
* @param allowExisting When false, an exception will be thrown if this directory already exists.
277-
* @param conf A Hadoop configuration object that can be used to specify options to the parquet
278-
* output format.
279-
*
280-
* @group userf
281-
*/
282-
@Experimental
283-
def createOrcFile[A <: Product : TypeTag](
284-
path: String,
285-
allowExisting: Boolean = true,
286-
conf: Configuration = new Configuration()): SchemaRDD = {
287-
new SchemaRDD(
288-
this,
289-
OrcRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
290-
}
291-
292249
/**
293250
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
294251
* during the lifetime of this instance of SQLContext.
@@ -334,7 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
334291
HashJoin ::
335292
InMemoryScans ::
336293
ParquetOperations ::
337-
OrcOperations::
338294
BasicOperators ::
339295
CartesianProduct ::
340296
BroadcastNestedLoopJoin :: Nil

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
package org.apache.spark.sql
1919

2020
import org.apache.spark.annotation.{DeveloperApi, Experimental}
21-
import org.apache.spark.sql.catalyst.plans.logical._
22-
import org.apache.spark.sql.execution.SparkLogicalPlan
2321
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
22+
import org.apache.spark.sql.catalyst.plans.logical._
2423
import org.apache.spark.sql.execution.LogicalRDD
2524

2625
/**
@@ -81,6 +80,7 @@ private[sql] trait SchemaRDDLike {
8180
* Saves the contents of this `SchemaRDD` as a orc file, preserving the schema. Files that
8281
* are written out using this method can be read back in as a SchemaRDD using the `orcFile`
8382
* function.
83+
* Note: you can only use it in HiveContext
8484
*
8585
* @group schema
8686
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.catalyst.types._
2727
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
2828
import org.apache.spark.sql.parquet._
29-
import org.apache.spark.sql.orc.{OrcTableScan, InsertIntoOrcTable, OrcRelation}
3029

3130
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3231
self: SQLContext#SparkPlanner =>
@@ -238,27 +237,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
238237
}
239238
}
240239

241-
object OrcOperations extends Strategy {
242-
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
243-
case logical.WriteToOrcFile(path, child) =>
244-
val relation =
245-
OrcRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
246-
InsertIntoOrcTable(relation, planLater(child), overwrite=true) :: Nil
247-
case logical.InsertIntoOrcTable(table: OrcRelation, partition, child, overwrite) =>
248-
InsertIntoOrcTable(table, planLater(child), overwrite) :: Nil
249-
case PhysicalOperation(projectList, filters, relation: OrcRelation) =>
250-
// TODO: need to implement predict push down.
251-
val prunePushedDownFilters = identity[Seq[Expression]] _
252-
pruneFilterProject(
253-
projectList,
254-
filters,
255-
prunePushedDownFilters,
256-
OrcTableScan(_, relation, None)) :: Nil
257-
258-
case _ => Nil
259-
}
260-
}
261-
262240
object InMemoryScans extends Strategy {
263241
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
264242
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ case class InsertIntoParquetTable(
283283
1
284284
} else {
285285
FileSystemHelper
286-
.findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
286+
.findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "parquet") + 1
287287
}
288288

289289
def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
@@ -488,7 +488,7 @@ private[parquet] object FilteringParquetRowInputFormat {
488488
.build[FileStatus, Array[BlockLocation]]()
489489
}
490490

491-
private[parquet] object FileSystemHelper {
491+
private[sql] object FileSystemHelper {
492492
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
493493
val origPath = new Path(pathStr)
494494
val fs = origPath.getFileSystem(conf)
@@ -504,19 +504,41 @@ private[parquet] object FileSystemHelper {
504504
fs.listStatus(path).map(_.getPath)
505505
}
506506

507-
/**
508-
* Finds the maximum taskid in the output file names at the given path.
509-
*/
510-
def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
507+
/**
508+
* List files with special extension
509+
*/
510+
def listFiles(origPath: Path, conf: Configuration, extension: String): Seq[Path] = {
511+
val fs = origPath.getFileSystem(conf)
512+
if (fs == null) {
513+
throw new IllegalArgumentException(
514+
s"OrcTableOperations: Path $origPath is incorrectly formatted")
515+
}
516+
val path = origPath.makeQualified(fs)
517+
if (fs.exists(path) && fs.getFileStatus(path).isDir) {
518+
fs.listStatus(path).map(_.getPath).filter(p => p.getName.endsWith(extension))
519+
} else {
520+
Seq.empty
521+
}
522+
}
523+
524+
/**
525+
* Finds the maximum taskid in the output file names at the given path.
526+
*/
527+
def findMaxTaskId(pathStr: String, conf: Configuration, extension: String): Int = {
511528
val files = FileSystemHelper.listFiles(pathStr, conf)
512-
// filename pattern is part-r-<int>.parquet
513-
val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
529+
// filename pattern is part-r-<int>.$extension
530+
val nameP = extension match {
531+
case "parquet" => new scala.util.matching.Regex( """part-r-(\d{1,}).parquet""", "taskid")
532+
case "orc" => new scala.util.matching.Regex( """part-r-(\d{1,}).orc""", "taskid")
533+
case _ =>
534+
sys.error(s"ERROR: unsupported extension: $extension")
535+
}
514536
val hiddenFileP = new scala.util.matching.Regex("_.*")
515537
files.map(_.getName).map {
516538
case nameP(taskid) => taskid.toInt
517539
case hiddenFileP() => 0
518540
case other: String => {
519-
sys.error("ERROR: attempting to append to set of Parquet files and found file" +
541+
sys.error(s"ERROR: attempting to append to set of $extension files and found file" +
520542
s"that does not match name pattern: $other")
521543
0
522544
}

0 commit comments

Comments
 (0)