Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.7.0.jar
parquet-common-1.7.0.jar
parquet-encoding-1.7.0.jar
parquet-column-1.8.1.jar
parquet-common-1.8.1.jar
parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
parquet-generator-1.7.0.jar
parquet-hadoop-1.7.0.jar
parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.7.0.jar
parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
Expand Down
11 changes: 5 additions & 6 deletions dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.7.0.jar
parquet-common-1.7.0.jar
parquet-encoding-1.7.0.jar
parquet-column-1.8.1.jar
parquet-common-1.8.1.jar
parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
parquet-generator-1.7.0.jar
parquet-hadoop-1.7.0.jar
parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.7.0.jar
parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
Expand Down
11 changes: 5 additions & 6 deletions dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.7.0.jar
parquet-common-1.7.0.jar
parquet-encoding-1.7.0.jar
parquet-column-1.8.1.jar
parquet-common-1.8.1.jar
parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
parquet-generator-1.7.0.jar
parquet-hadoop-1.7.0.jar
parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.7.0.jar
parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
Expand Down
11 changes: 5 additions & 6 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.7.0.jar
parquet-common-1.7.0.jar
parquet-encoding-1.7.0.jar
parquet-column-1.8.1.jar
parquet-common-1.8.1.jar
parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
parquet-generator-1.7.0.jar
parquet-hadoop-1.7.0.jar
parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.7.0.jar
parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
Expand Down
11 changes: 5 additions & 6 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,13 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.7.0.jar
parquet-common-1.7.0.jar
parquet-encoding-1.7.0.jar
parquet-column-1.8.1.jar
parquet-common-1.8.1.jar
parquet-encoding-1.8.1.jar
parquet-format-2.3.0-incubating.jar
parquet-generator-1.7.0.jar
parquet-hadoop-1.7.0.jar
parquet-hadoop-1.8.1.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.7.0.jar
parquet-jackson-1.8.1.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.11.1.1</derby.version>
<parquet.version>1.7.0</parquet.version>
<parquet.version>1.8.1</parquet.version>
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.2.16.v20160414</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.StructType;

Expand Down Expand Up @@ -186,15 +188,19 @@ protected void initialize(String path, List<String> columns) throws IOException
if (columns == null) {
this.requestedSchema = fileSchema;
} else {
Types.MessageTypeBuilder builder = Types.buildMessage();
for (String s: columns) {
if (!fileSchema.containsField(s)) {
throw new IOException("Can only project existing columns. Unknown field: " + s +
" File schema:\n" + fileSchema);
if (columns.size() > 0) {
Types.MessageTypeBuilder builder = Types.buildMessage();
for (String s: columns) {
if (!fileSchema.containsField(s)) {
throw new IOException("Can only project existing columns. Unknown field: " + s +
" File schema:\n" + fileSchema);
}
builder.addFields(fileSchema.getType(s));
}
builder.addFields(fileSchema.getType(s));
this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
} else {
this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
}
this.requestedSchema = builder.named("spark_schema");
}
this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ private[parquet] object CatalystReadSupport {
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
Types
.buildMessage()
.addFields(clippedParquetFields: _*)
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
if (clippedParquetFields.isEmpty) {
CatalystSchemaConverter.EMPTY_MESSAGE
} else {
Types
.buildMessage()
.addFields(clippedParquetFields: _*)
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
}

private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,22 @@ private[parquet] class CatalystSchemaConverter(
private[parquet] object CatalystSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"

// !! HACK ALERT !!
//
// PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
// which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
// This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
//
// To workaround this problem, here we first construct a `MessageType` with a single dummy
// field, and then remove the field to obtain an empty `MessageType`.
//
// TODO Reverts this change after upgrading parquet-mr to 1.8.2+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue When will Parquet 1.8.2 be released?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're close to a 1.9.0 release, just working through some performance issues with the switch to ByteBuffer APIs. No estimate on that yet. We can do a 1.8.2 if there's interest so that we can fix some things like this without pulling in all those changes.

val EMPTY_MESSAGE = Types
.buildMessage()
.required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
EMPTY_MESSAGE.getFields.clear()

def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
checkConversionRequirement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import java.io.Serializable
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.OriginalType
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -53,18 +51,15 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}

private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -79,17 +74,14 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
*/
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
}

private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -102,16 +94,13 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -124,16 +113,13 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -147,15 +133,13 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
Expand All @@ -168,16 +152,13 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
Binary.fromString(v.asInstanceOf[String]))
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
*/
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
Expand All @@ -194,17 +175,14 @@ private[sql] object ParquetFilters {
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))

// See https://issues.apache.org/jira/browse/SPARK-11153
/*
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
*/
SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
}

/**
Expand All @@ -228,8 +206,6 @@ private[sql] object ParquetFilters {
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema).toMap

relaxParquetValidTypeMap

// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
Expand Down Expand Up @@ -299,35 +275,4 @@ private[sql] object ParquetFilters {
case _ => None
}
}

// !! HACK ALERT !!
//
// This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
// parquet-mr 1.8.1 or higher versions.
//
// In Parquet, not all types of columns can be used for filter push-down optimization. The set
// of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
// prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
// pushed down.
//
// This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
// to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
// a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
// legal except that it fails the `ValidTypeMap` check.
//
// Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
private lazy val relaxParquetValidTypeMap: Unit = {
val constructor = Class
.forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
.getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])

constructor.setAccessible(true)
val enumTypeDescriptor = constructor
.newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
.asInstanceOf[AnyRef]

val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
addMethod.setAccessible(true)
addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
}
}
Loading