Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class SqlParser extends StandardTokenParsers {
protected val IF = Keyword("IF")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LEFT = Keyword("LEFT")
Expand All @@ -114,6 +116,7 @@ class SqlParser extends StandardTokenParsers {
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val LIKE = Keyword("LIKE")
protected val RLIKE = Keyword("RLIKE")
protected val REGEXP = Keyword("REGEXP")
Expand Down Expand Up @@ -162,7 +165,7 @@ class SqlParser extends StandardTokenParsers {
select * (
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
) | insert

protected lazy val select: Parser[LogicalPlan] =
SELECT ~> opt(DISTINCT) ~ projections ~
Expand All @@ -185,6 +188,13 @@ class SqlParser extends StandardTokenParsers {
withLimit
}

protected lazy val insert: Parser[LogicalPlan] =
INSERT ~> opt(OVERWRITE) ~ inTo ~ select <~ opt(";") ^^ {
case o ~ r ~ s =>
val overwrite: Boolean = o.getOrElse("") == "OVERWRITE"
InsertIntoTable(r, Map[String, Option[String]](), s, overwrite)
}

protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")

protected lazy val projection: Parser[Expression] =
Expand All @@ -195,6 +205,8 @@ class SqlParser extends StandardTokenParsers {

protected lazy val from: Parser[LogicalPlan] = FROM ~> relations

protected lazy val inTo: Parser[LogicalPlan] = INTO ~> relation

// Based very loosely on the MySQL Grammar.
// http://dev.mysql.com/doc/refman/5.0/en/join.html
protected lazy val relations: Parser[LogicalPlan] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,33 @@ trait Catalog {
alias: Option[String] = None): LogicalPlan

def registerTable(databaseName: Option[String], tableName: String, plan: LogicalPlan): Unit

def unregisterTable(databaseName: Option[String], tableName: String): Unit

def unregisterAllTables(): Unit
}

class SimpleCatalog extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()

def registerTable(databaseName: Option[String],tableName: String, plan: LogicalPlan): Unit = {
override def registerTable(
databaseName: Option[String],
tableName: String,
plan: LogicalPlan): Unit = {
tables += ((tableName, plan))
}

def unregisterTable(databaseName: Option[String], tableName: String) = { tables -= tableName }
override def unregisterTable(
databaseName: Option[String],
tableName: String) = {
tables -= tableName
}

override def unregisterAllTables() = {
tables.clear()
}

def lookupRelation(
override def lookupRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None): LogicalPlan = {
Expand Down Expand Up @@ -92,6 +106,10 @@ trait OverrideCatalog extends Catalog {
override def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
overrides.remove((databaseName, tableName))
}

override def unregisterAllTables(): Unit = {
overrides.clear()
}
}

/**
Expand All @@ -113,4 +131,6 @@ object EmptyCatalog extends Catalog {
def unregisterTable(databaseName: Option[String], tableName: String): Unit = {
throw new UnsupportedOperationException
}

override def unregisterAllTables(): Unit = {}
}
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))

/**
* Loads a parequet file, returning the result as a [[SchemaRDD]].
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
*
* @group userf
*/
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation("ParquetFile", path))
new SchemaRDD(this, parquet.ParquetRelation(path))


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, None)
InsertIntoParquetTable(relation, planLater(child))(sparkContext) :: Nil
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child))(sparkContext) :: Nil
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
// TODO: Should be pushing down filters as well.
pruneFilterProject(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,44 @@

package org.apache.spark.sql.parquet

import java.io.{IOException, FileNotFoundException}

import scala.collection.JavaConversions._
import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsAction
import org.apache.hadoop.mapreduce.Job

import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
import parquet.hadoop.util.ContextUtil
import parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter}
import parquet.hadoop.{ParquetOutputFormat, Footer, ParquetFileWriter, ParquetFileReader}
import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
import parquet.io.api.{Binary, RecordConsumer}
import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser}
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
import parquet.schema.Type.Repetition
import parquet.schema.{MessageType, MessageTypeParser}
import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
import parquet.schema.{Type => ParquetType}

import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
import org.apache.spark.sql.catalyst.types._

// Implicits
import scala.collection.JavaConversions._

/**
* Relation that consists of data stored in a Parquet columnar format.
*
* Users should interact with parquet files though a SchemaRDD, created by a [[SQLContext]] instead
* of using this class directly.
*
* {{{
* val parquetRDD = sqlContext.parquetFile("path/to/parequet.file")
* val parquetRDD = sqlContext.parquetFile("path/to/parquet.file")
* }}}
*
* @param tableName The name of the relation that can be used in queries.
* @param path The path to the Parquet file.
*/
case class ParquetRelation(tableName: String, path: String)
extends BaseRelation with MultiInstanceRelation {
private[sql] case class ParquetRelation(val path: String)
extends LeafNode with MultiInstanceRelation {
self: Product =>

/** Schema derived from ParquetFile */
def parquetSchema: MessageType =
Expand All @@ -65,33 +64,59 @@ case class ParquetRelation(tableName: String, path: String)
.getSchema

/** Attributes */
val attributes =
override val output =
ParquetTypesConverter
.convertToAttributes(parquetSchema)
.convertToAttributes(parquetSchema)

/** Output */
override val output = attributes

// Parquet files have no concepts of keys, therefore no Partitioner
// Note: we could allow Block level access; needs to be thought through
override def isPartitioned = false

override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type]
override def newInstance = ParquetRelation(path).asInstanceOf[this.type]

// Equals must also take into account the output attributes so that we can distinguish between
// different instances of the same relation,
override def equals(other: Any) = other match {
case p: ParquetRelation =>
p.tableName == tableName && p.path == path && p.output == output
p.path == path && p.output == output
case _ => false
}
}

object ParquetRelation {
private[sql] object ParquetRelation {

def enableLogForwarding() {
// Note: Parquet does not use forwarding to parent loggers which
// is required for the JUL-SLF4J bridge to work. Also there is
// a default logger that appends to Console which needs to be
// reset.
import org.slf4j.bridge.SLF4JBridgeHandler
import java.util.logging.Logger
import java.util.logging.LogManager

val loggerNames = Seq(
"parquet.hadoop.ColumnChunkPageWriteStore",
"parquet.hadoop.InternalParquetRecordWriter",
"parquet.hadoop.ParquetRecordReader",
"parquet.hadoop.ParquetInputFormat",
"parquet.hadoop.ParquetOutputFormat",
"parquet.hadoop.ParquetFileReader",
"parquet.hadoop.InternalParquetRecordReader",
"parquet.hadoop.codec.CodecConfig")
LogManager.getLogManager.reset()
SLF4JBridgeHandler.install()
for(name <- loggerNames) {
val logger = Logger.getLogger(name)
logger.setParent(Logger.getGlobal)
logger.setUseParentHandlers(true)
}
}

// The element type for the RDDs that this relation maps to.
type RowType = org.apache.spark.sql.catalyst.expressions.GenericMutableRow

// The compression type
type CompressionType = parquet.hadoop.metadata.CompressionCodecName

// The default compression
val defaultCompression = CompressionCodecName.GZIP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but this should probably be an argument of WriteToFile or ParquetRelation or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I was thinking the same but did not want to introduce lots of Parquet dependencies on the logical plan side. Maybe there should be some Spark system level compression types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @AndreSchumacher and @marmbrus , it seems we can use a hadoop config property to change this defaultCompression, there is a check in createEmpty method:

    if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
      conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
    }

but it is a hadoop config property, not a spark config property, so we can not simply set this property in conf/spark-defaults.conf


/**
* Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
* this is used inside [[org.apache.spark.sql.execution.SparkStrategies SparkStrategies]] to
Expand All @@ -100,24 +125,39 @@ object ParquetRelation {
*
* @param pathString The directory the Parquetfile will be stored in.
* @param child The child node that will be used for extracting the schema.
* @param conf A configuration configuration to be used.
* @param tableName The name of the resulting relation.
* @return An empty ParquetRelation inferred metadata.
* @param conf A configuration to be used.
* @return An empty ParquetRelation with inferred metadata.
*/
def create(pathString: String,
child: LogicalPlan,
conf: Configuration,
tableName: Option[String]): ParquetRelation = {
conf: Configuration): ParquetRelation = {
if (!child.resolved) {
throw new UnresolvedException[LogicalPlan](
child,
"Attempt to create Parquet table from unresolved child (when schema is not available)")
}
createEmpty(pathString, child.output, conf)
}

val name = s"${tableName.getOrElse(child.nodeName)}_parquet"
/**
* Creates an empty ParquetRelation and underlying Parquetfile that only
* consists of the Metadata for the given schema.
*
* @param pathString The directory the Parquetfile will be stored in.
* @param attributes The schema of the relation.
* @param conf A configuration to be used.
* @return An empty ParquetRelation.
*/
def createEmpty(pathString: String,
attributes: Seq[Attribute],
conf: Configuration): ParquetRelation = {
val path = checkPath(pathString, conf)
ParquetTypesConverter.writeMetaData(child.output, path, conf)
new ParquetRelation(name, path.toString)
if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
}
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
new ParquetRelation(path.toString)
}

private def checkPath(pathStr: String, conf: Configuration): Path = {
Expand All @@ -143,7 +183,7 @@ object ParquetRelation {
}
}

object ParquetTypesConverter {
private[parquet] object ParquetTypesConverter {
def toDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
// for now map binary to string type
// TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema
Expand Down Expand Up @@ -242,6 +282,7 @@ object ParquetTypesConverter {
extraMetadata,
"Spark")

ParquetRelation.enableLogForwarding()
ParquetFileWriter.writeMetadataFile(
conf,
path,
Expand All @@ -268,16 +309,24 @@ object ParquetTypesConverter {
throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath")
}
val path = origPath.makeQualified(fs)
if (!fs.getFileStatus(path).isDir) {
throw new IllegalArgumentException(
s"Expected $path for be a directory with Parquet files/metadata")
}
ParquetRelation.enableLogForwarding()
val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
// if this is a new table that was just created we will find only the metadata file
if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
// TODO: improve exception handling, etc.
ParquetFileReader.readFooter(conf, metadataPath)
} else {
if (!fs.exists(path) || !fs.isFile(path)) {
throw new FileNotFoundException(
s"Could not find file ${path.toString} when trying to read metadata")
// there may be one or more Parquet files in the given directory
val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
// TODO: for now we assume that all footers (if there is more than one) have identical
// metadata; we may want to add a check here at some point
if (footers.size() == 0) {
throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
}
ParquetFileReader.readFooter(conf, path)
footers(0).getParquetMetadata
}
}
}
Loading