Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.6.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.8.1.jar
parquet-common-1.8.1.jar
parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
Expand Down
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.6.jar
parquet-column-1.8.2.jar
parquet-common-1.8.2.jar
parquet-encoding-1.8.2.jar
parquet-format-2.3.1.jar
parquet-hadoop-1.8.2.jar
parquet-column-1.8.1.jar
parquet-common-1.8.1.jar
parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.8.2.jar
parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.8.2</parquet.version>
<parquet.version>1.8.1</parquet.version>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.3.11.v20160721</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,9 +582,9 @@ class Analyzer(
// |- view2 (defaultDatabase = db2)
// |- view3 (defaultDatabase = db3)
// |- view4 (defaultDatabase = db4)
// In this case, the view `view1` is a nested view, it directly references `table2`, `view2`
// In this case, the view `view1` is a nested view, it directly references `table2``view2`
// and `view4`, the view `view2` references `view3`. On resolving the table, we look up the
// relations `table2`, `view2`, `view4` using the default database `db1`, and look up the
// relations `table2``view2``view4` using the default database `db1`, and look up the
// relation `view3` using the default database `db2`.
//
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,13 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
@InterfaceStability.Stable
object StructType extends AbstractDataType {

/**
* A key used in field metadata to indicate that the field comes from the result of merging
* two different StructTypes that do not always contain the field. That is to say, the field
* might be missing (optional) from one of the StructTypes.
*/
private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"

override private[sql] def defaultConcreteType: DataType = new StructType

override private[sql] def acceptsType(other: DataType): Boolean = {
Expand Down Expand Up @@ -462,6 +469,8 @@ object StructType extends AbstractDataType {

case (StructType(leftFields), StructType(rightFields)) =>
val newFields = ArrayBuffer.empty[StructField]
// This metadata will record the fields that only exist in one of two StructTypes
val optionalMeta = new MetadataBuilder()

val rightMapped = fieldsMap(rightFields)
leftFields.foreach {
Expand All @@ -473,7 +482,8 @@ object StructType extends AbstractDataType {
nullable = leftNullable || rightNullable)
}
.orElse {
Some(leftField)
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
Some(leftField.copy(metadata = optionalMeta.build()))
}
.foreach(newFields += _)
}
Expand All @@ -482,7 +492,8 @@ object StructType extends AbstractDataType {
rightFields
.filterNot(f => leftMapped.get(f.name).nonEmpty)
.foreach { f =>
newFields += f
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
newFields += f.copy(metadata = optionalMeta.build())
}

StructType(newFields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,55 @@ class DataTypeSuite extends SparkFunSuite {
assert(mapped === expected)
}

test("merge where right is empty") {
val left = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val right = StructType(List())
val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, left))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where left is empty") {

val left = StructType(List())

val right = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, right))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where both are non-empty") {
val left = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val right = StructType(
StructField("c", LongType) :: Nil)

val expected = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) ::
StructField("c", LongType) :: Nil)

val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, expected))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where right contains type conflict") {
val left = StructType(
StructField("a", LongType) ::
Expand Down
13 changes: 0 additions & 13 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,6 @@
<artifactId>parquet-avro</artifactId>
<scope>test</scope>
</dependency>
<!--
This version of avro test-dep is different from the one defined
in the parent pom. The parent pom has avro 1.7.7 test-dep for Hadoop.
Here, ParquetAvroCompatibilitySuite uses parquet-avro's AvroParquetWriter
which uses avro 1.8.0+ specific API. In Maven 3, we need to have
this here to have different versions for the same artifact.
-->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ class ParquetFileFormat

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
ParquetWriteSupport.setSchema(dataSchema, conf)
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -295,7 +297,11 @@ class ParquetFileFormat
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)

ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)

// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
Expand All @@ -50,6 +52,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -64,6 +67,8 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Expand All @@ -72,6 +77,7 @@ private[parquet] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -84,13 +90,16 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -103,13 +112,16 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -122,13 +134,16 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -141,24 +156,36 @@ private[parquet] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See SPARK-17213: https://issues.apache.org/jira/browse/SPARK-17213
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
*/
}

/**
* Returns a map from name of the column to the data type, if predicate push down applies.
* Returns a map from name of the column to the data type, if predicate push down applies
* (i.e. not an optional field).
*
* SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField.
* These fields only exist in one side of merged schemas. Due to that, we can't push down filters
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
* Here we filter out such fields.
*/
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.map(f => f.name -> f.dataType).toMap
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,21 @@ private[parquet] class ParquetSchemaConverter(
private[parquet] object ParquetSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"

val EMPTY_MESSAGE: MessageType =
Types.buildMessage().named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
// !! HACK ALERT !!
//
// PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
// which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
// This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
//
// To workaround this problem, here we first construct a `MessageType` with a single dummy
// field, and then remove the field to obtain an empty `MessageType`.
//
// TODO Reverts this change after upgrading parquet-mr to 1.8.2+
val EMPTY_MESSAGE = Types
.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
EMPTY_MESSAGE.getFields.clear()

def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
Expand Down
Loading