diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index b5c38a6c056e..96001eade028 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -129,13 +129,14 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 969df0495d4c..9f3d9ad97a9f 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -136,13 +136,14 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index f0491ece7c2b..77d52666ba69 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -136,13 +136,14 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index b3dced63b9e7..9afe50f765d3 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -144,13 +144,14 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 16f60f29ffbb..879157a6dce7 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -145,13 +145,14 @@ opencsv-2.3.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.3.jar
-parquet-column-1.8.1.jar
-parquet-common-1.8.1.jar
-parquet-encoding-1.8.1.jar
+parquet-column-1.7.0.jar
+parquet-common-1.7.0.jar
+parquet-encoding-1.7.0.jar
parquet-format-2.3.0-incubating.jar
-parquet-hadoop-1.8.1.jar
+parquet-generator-1.7.0.jar
+parquet-hadoop-1.7.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.1.jar
+parquet-jackson-1.7.0.jar
pmml-model-1.2.15.jar
pmml-schema-1.2.15.jar
protobuf-java-2.5.0.jar
diff --git a/pom.xml b/pom.xml
index 60c8c8dc7a72..79ee7876f466 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
1.2.1
10.11.1.1
- 1.8.1
+ 1.7.0
1.6.0
9.2.16.v20160414
3.1.0
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 3f7a872ff635..cbe8f78164ae 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -58,8 +58,6 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.spark.sql.types.StructType;
@@ -188,19 +186,15 @@ protected void initialize(String path, List columns) throws IOException
if (columns == null) {
this.requestedSchema = fileSchema;
} else {
- if (columns.size() > 0) {
- Types.MessageTypeBuilder builder = Types.buildMessage();
- for (String s: columns) {
- if (!fileSchema.containsField(s)) {
- throw new IOException("Can only project existing columns. Unknown field: " + s +
- " File schema:\n" + fileSchema);
- }
- builder.addFields(fileSchema.getType(s));
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+ for (String s: columns) {
+ if (!fileSchema.containsField(s)) {
+ throw new IOException("Can only project existing columns. Unknown field: " + s +
+ " File schema:\n" + fileSchema);
}
- this.requestedSchema = builder.named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
- } else {
- this.requestedSchema = CatalystSchemaConverter.EMPTY_MESSAGE();
+ builder.addFields(fileSchema.getType(s));
}
+ this.requestedSchema = builder.named("spark_schema");
}
this.sparkSchema = new CatalystSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(config, file, blocks, requestedSchema.getColumns());
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 9c885b252f01..850e807b8677 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -109,14 +109,10 @@ private[parquet] object CatalystReadSupport {
*/
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
- if (clippedParquetFields.isEmpty) {
- CatalystSchemaConverter.EMPTY_MESSAGE
- } else {
- Types
- .buildMessage()
- .addFields(clippedParquetFields: _*)
- .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
- }
+ Types
+ .buildMessage()
+ .addFields(clippedParquetFields: _*)
+ .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 3688c3e2b57e..6f6340f541ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -538,22 +538,6 @@ private[parquet] class CatalystSchemaConverter(
private[parquet] object CatalystSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"
- // !! HACK ALERT !!
- //
- // PARQUET-363 & PARQUET-278: parquet-mr 1.8.1 doesn't allow constructing empty GroupType,
- // which prevents us to avoid selecting any columns for queries like `SELECT COUNT(*) FROM t`.
- // This issue has been fixed in parquet-mr 1.8.2-SNAPSHOT.
- //
- // To workaround this problem, here we first construct a `MessageType` with a single dummy
- // field, and then remove the field to obtain an empty `MessageType`.
- //
- // TODO Reverts this change after upgrading parquet-mr to 1.8.2+
- val EMPTY_MESSAGE = Types
- .buildMessage()
- .required(PrimitiveType.PrimitiveTypeName.INT32).named("dummy")
- .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
- EMPTY_MESSAGE.getFields.clear()
-
def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
checkConversionRequirement(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 624081250113..95afdc789f32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,6 +22,8 @@ import java.io.Serializable
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
+import org.apache.parquet.schema.OriginalType
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
@@ -51,15 +53,18 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
- Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
+ Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
- Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ */
}
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -74,14 +79,17 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
- Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
+ Option(v).map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8"))).orNull)
case BinaryType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
- Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ Option(v).map(b => Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ */
}
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -94,13 +102,16 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
+ Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ FilterApi.lt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -113,13 +124,16 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
+ Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ FilterApi.ltEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -133,13 +147,15 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
// See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
+ Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ FilterApi.gt(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
@@ -152,13 +168,16 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
- Binary.fromString(v.asInstanceOf[String]))
+ Binary.fromByteArray(v.asInstanceOf[String].getBytes("utf-8")))
case BinaryType =>
(n: String, v: Any) =>
- FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
+ FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
@@ -175,14 +194,17 @@ private[sql] object ParquetFilters {
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(s => Binary.fromString(s.asInstanceOf[String]))))
+ SetInFilter(v.map(s => Binary.fromByteArray(s.asInstanceOf[String].getBytes("utf-8")))))
case BinaryType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
- SetInFilter(v.map(e => Binary.fromReusedByteArray(e.asInstanceOf[Array[Byte]]))))
+ SetInFilter(v.map(e => Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+ */
}
/**
@@ -206,6 +228,8 @@ private[sql] object ParquetFilters {
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema).toMap
+ relaxParquetValidTypeMap
+
// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
@@ -275,4 +299,35 @@ private[sql] object ParquetFilters {
case _ => None
}
}
+
+ // !! HACK ALERT !!
+ //
+ // This lazy val is a workaround for PARQUET-201, and should be removed once we upgrade to
+ // parquet-mr 1.8.1 or higher versions.
+ //
+ // In Parquet, not all types of columns can be used for filter push-down optimization. The set
+ // of valid column types is controlled by `ValidTypeMap`. Unfortunately, in parquet-mr 1.7.0 and
+ // prior versions, the limitation is too strict, and doesn't allow `BINARY (ENUM)` columns to be
+ // pushed down.
+ //
+ // This restriction is problematic for Spark SQL, because Spark SQL doesn't have a type that maps
+ // to Parquet original type `ENUM` directly, and always converts `ENUM` to `StringType`. Thus,
+ // a predicate involving a `ENUM` field can be pushed-down as a string column, which is perfectly
+ // legal except that it fails the `ValidTypeMap` check.
+ //
+ // Here we add `BINARY (ENUM)` into `ValidTypeMap` lazily via reflection to workaround this issue.
+ private lazy val relaxParquetValidTypeMap: Unit = {
+ val constructor = Class
+ .forName(classOf[ValidTypeMap].getCanonicalName + "$FullTypeDescriptor")
+ .getDeclaredConstructor(classOf[PrimitiveTypeName], classOf[OriginalType])
+
+ constructor.setAccessible(true)
+ val enumTypeDescriptor = constructor
+ .newInstance(PrimitiveTypeName.BINARY, OriginalType.ENUM)
+ .asInstanceOf[AnyRef]
+
+ val addMethod = classOf[ValidTypeMap].getDeclaredMethods.find(_.getName == "add").get
+ addMethod.setAccessible(true)
+ addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 0b5038cb8280..6db649228210 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+import org.apache.parquet.schema.MessageTypeParser
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
@@ -1065,26 +1065,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
parquetSchema: String,
catalystSchema: StructType,
expectedSchema: String): Unit = {
- testSchemaClipping(testName, parquetSchema, catalystSchema,
- MessageTypeParser.parseMessageType(expectedSchema))
- }
-
- private def testSchemaClipping(
- testName: String,
- parquetSchema: String,
- catalystSchema: StructType,
- expectedSchema: MessageType): Unit = {
test(s"Clipping - $testName") {
+ val expected = MessageTypeParser.parseMessageType(expectedSchema)
val actual = CatalystReadSupport.clipParquetSchema(
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
try {
- expectedSchema.checkContains(actual)
- actual.checkContains(expectedSchema)
+ expected.checkContains(actual)
+ actual.checkContains(expected)
} catch { case cause: Throwable =>
fail(
s"""Expected clipped schema:
- |$expectedSchema
+ |$expected
|Actual clipped schema:
|$actual
""".stripMargin,
@@ -1437,7 +1429,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
catalystSchema = new StructType(),
- expectedSchema = CatalystSchemaConverter.EMPTY_MESSAGE)
+ expectedSchema = "message root {}")
testSchemaClipping(
"disjoint field sets",