Skip to content

Commit 2d688d6

Browse files
committed
Renames ParquetRelation2 to ParquetRelation
1 parent ca9e1b7 commit 2d688d6

File tree

12 files changed

+55
-56
lines changed

12 files changed

+55
-56
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@ import java.util.Properties
2121

2222
import org.apache.hadoop.fs.Path
2323

24-
import org.apache.spark.{Logging, Partition}
2524
import org.apache.spark.annotation.Experimental
2625
import org.apache.spark.api.java.JavaRDD
2726
import org.apache.spark.deploy.SparkHadoopUtil
2827
import org.apache.spark.rdd.RDD
29-
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
28+
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
3029
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
3130
import org.apache.spark.sql.json.JSONRelation
32-
import org.apache.spark.sql.parquet.ParquetRelation2
31+
import org.apache.spark.sql.parquet.ParquetRelation
3332
import org.apache.spark.sql.types.StructType
33+
import org.apache.spark.{Logging, Partition}
3434

3535
/**
3636
* :: Experimental ::
@@ -259,7 +259,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
259259
}.toArray
260260

261261
sqlContext.baseRelationToDataFrame(
262-
new ParquetRelation2(
262+
new ParquetRelation(
263263
globbedPaths.map(_.toString), None, None, extraOptions.toMap)(sqlContext))
264264
}
265265
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet
1919

2020
import org.apache.spark.sql.catalyst.InternalRow
2121

22+
// TODO Removes this while fixing SPARK-8848
2223
private[sql] object CatalystConverter {
2324
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`).
2425
// Note that "array" for the array elements is chosen by ParquetAvro.

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider {
5757
schema: Option[StructType],
5858
partitionColumns: Option[StructType],
5959
parameters: Map[String, String]): HadoopFsRelation = {
60-
new ParquetRelation2(paths, schema, None, partitionColumns, parameters)(sqlContext)
60+
new ParquetRelation(paths, schema, None, partitionColumns, parameters)(sqlContext)
6161
}
6262
}
6363

@@ -93,7 +93,7 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
9393
override def close(): Unit = recordWriter.close(context)
9494
}
9595

96-
private[sql] class ParquetRelation2(
96+
private[sql] class ParquetRelation(
9797
override val paths: Array[String],
9898
private val maybeDataSchema: Option[StructType],
9999
// This is for metastore conversion.
@@ -121,12 +121,12 @@ private[sql] class ParquetRelation2(
121121
// Should we merge schemas from all Parquet part-files?
122122
private val shouldMergeSchemas =
123123
parameters
124-
.get(ParquetRelation2.MERGE_SCHEMA)
124+
.get(ParquetRelation.MERGE_SCHEMA)
125125
.map(_.toBoolean)
126126
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
127127

128128
private val maybeMetastoreSchema = parameters
129-
.get(ParquetRelation2.METASTORE_SCHEMA)
129+
.get(ParquetRelation.METASTORE_SCHEMA)
130130
.map(DataType.fromJson(_).asInstanceOf[StructType])
131131

132132
private lazy val metadataCache: MetadataCache = {
@@ -136,7 +136,7 @@ private[sql] class ParquetRelation2(
136136
}
137137

138138
override def equals(other: Any): Boolean = other match {
139-
case that: ParquetRelation2 =>
139+
case that: ParquetRelation =>
140140
val schemaEquality = if (shouldMergeSchemas) {
141141
this.shouldMergeSchemas == that.shouldMergeSchemas
142142
} else {
@@ -242,7 +242,7 @@ private[sql] class ParquetRelation2(
242242
// Sets compression scheme
243243
conf.set(
244244
ParquetOutputFormat.COMPRESSION,
245-
ParquetRelation2
245+
ParquetRelation
246246
.shortParquetCompressionCodecNames
247247
.getOrElse(
248248
sqlContext.conf.parquetCompressionCodec.toUpperCase,
@@ -269,7 +269,7 @@ private[sql] class ParquetRelation2(
269269

270270
// Create the function to set variable Parquet confs at both driver and executor side.
271271
val initLocalJobFuncOpt =
272-
ParquetRelation2.initializeLocalJobFunc(
272+
ParquetRelation.initializeLocalJobFunc(
273273
requiredColumns,
274274
filters,
275275
dataSchema,
@@ -280,7 +280,7 @@ private[sql] class ParquetRelation2(
280280
followParquetFormatSpec) _
281281

282282
// Create the function to set input paths at the driver side.
283-
val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _
283+
val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
284284

285285
Utils.withDummyCallSite(sqlContext.sparkContext) {
286286
new SqlNewHadoopRDD(
@@ -387,7 +387,7 @@ private[sql] class ParquetRelation2(
387387
// case insensitivity issue and possible schema mismatch (probably caused by schema
388388
// evolution).
389389
maybeMetastoreSchema
390-
.map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
390+
.map(ParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
391391
.getOrElse(dataSchema0)
392392
}
393393
}
@@ -442,12 +442,12 @@ private[sql] class ParquetRelation2(
442442
"No predefined schema found, " +
443443
s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.")
444444

445-
ParquetRelation2.mergeSchemasInParallel(filesToTouch, sqlContext)
445+
ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext)
446446
}
447447
}
448448
}
449449

450-
private[sql] object ParquetRelation2 extends Logging {
450+
private[sql] object ParquetRelation extends Logging {
451451
// Whether we should merge schemas collected from all Parquet part-files.
452452
private[sql] val MERGE_SCHEMA = "mergeSchema"
453453

@@ -691,7 +691,7 @@ private[sql] object ParquetRelation2 extends Logging {
691691
followParquetFormatSpec = followParquetFormatSpec)
692692

693693
footers.map { footer =>
694-
ParquetRelation2.readSchemaFromFooter(footer, converter)
694+
ParquetRelation.readSchemaFromFooter(footer, converter)
695695
}.reduceOption(_ merge _).iterator
696696
}.collect()
697697

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
5151
}
5252

5353
log.debug(s"write support initialized for requested schema $attributes")
54-
ParquetRelation2.enableLogForwarding()
54+
ParquetRelation.enableLogForwarding()
5555
new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
5656
}
5757

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
104104
extraMetadata,
105105
"Spark")
106106

107-
ParquetRelation2.enableLogForwarding()
107+
ParquetRelation.enableLogForwarding()
108108
ParquetFileWriter.writeMetadataFile(
109109
conf,
110110
path,
@@ -140,7 +140,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
140140
(name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE
141141
}
142142

143-
ParquetRelation2.enableLogForwarding()
143+
ParquetRelation.enableLogForwarding()
144144

145145
// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
146146
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
5656
.where(Column(predicate))
5757

5858
val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
59-
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
59+
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters
6060
}.flatten.reduceOption(_ && _)
6161

6262
assert(maybeAnalyzedPredicate.isDefined)

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
467467
(1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
468468
val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
469469
queryExecution.analyzed.collectFirst {
470-
case LogicalRelation(relation: ParquetRelation2) =>
470+
case LogicalRelation(relation: ParquetRelation) =>
471471
assert(relation.partitionSpec === PartitionSpec.emptySpec)
472472
}.getOrElse {
473473
fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
378378
StructField("lowerCase", StringType),
379379
StructField("UPPERCase", DoubleType, nullable = false)))) {
380380

381-
ParquetRelation2.mergeMetastoreParquetSchema(
381+
ParquetRelation.mergeMetastoreParquetSchema(
382382
StructType(Seq(
383383
StructField("lowercase", StringType),
384384
StructField("uppercase", DoubleType, nullable = false))),
@@ -393,7 +393,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
393393
StructType(Seq(
394394
StructField("UPPERCase", DoubleType, nullable = false)))) {
395395

396-
ParquetRelation2.mergeMetastoreParquetSchema(
396+
ParquetRelation.mergeMetastoreParquetSchema(
397397
StructType(Seq(
398398
StructField("uppercase", DoubleType, nullable = false))),
399399

@@ -404,7 +404,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
404404

405405
// Metastore schema contains additional non-nullable fields.
406406
assert(intercept[Throwable] {
407-
ParquetRelation2.mergeMetastoreParquetSchema(
407+
ParquetRelation.mergeMetastoreParquetSchema(
408408
StructType(Seq(
409409
StructField("uppercase", DoubleType, nullable = false),
410410
StructField("lowerCase", BinaryType, nullable = false))),
@@ -415,7 +415,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
415415

416416
// Conflicting non-nullable field names
417417
intercept[Throwable] {
418-
ParquetRelation2.mergeMetastoreParquetSchema(
418+
ParquetRelation.mergeMetastoreParquetSchema(
419419
StructType(Seq(StructField("lower", StringType, nullable = false))),
420420
StructType(Seq(StructField("lowerCase", BinaryType))))
421421
}
@@ -429,7 +429,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
429429
StructField("firstField", StringType, nullable = true),
430430
StructField("secondField", StringType, nullable = true),
431431
StructField("thirdfield", StringType, nullable = true)))) {
432-
ParquetRelation2.mergeMetastoreParquetSchema(
432+
ParquetRelation.mergeMetastoreParquetSchema(
433433
StructType(Seq(
434434
StructField("firstfield", StringType, nullable = true),
435435
StructField("secondfield", StringType, nullable = true),
@@ -442,7 +442,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
442442
// Merge should fail if the Metastore contains any additional fields that are not
443443
// nullable.
444444
assert(intercept[Throwable] {
445-
ParquetRelation2.mergeMetastoreParquetSchema(
445+
ParquetRelation.mergeMetastoreParquetSchema(
446446
StructType(Seq(
447447
StructField("firstfield", StringType, nullable = true),
448448
StructField("secondfield", StringType, nullable = true),

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.collection.JavaConversions._
2121

2222
import com.google.common.base.Objects
2323
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
24-
2524
import org.apache.hadoop.fs.Path
2625
import org.apache.hadoop.hive.common.StatsSetupConst
2726
import org.apache.hadoop.hive.metastore.Warehouse
@@ -30,7 +29,6 @@ import org.apache.hadoop.hive.ql.metadata._
3029
import org.apache.hadoop.hive.ql.plan.TableDesc
3130

3231
import org.apache.spark.Logging
33-
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
3432
import org.apache.spark.sql.catalyst.InternalRow
3533
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
3634
import org.apache.spark.sql.catalyst.expressions._
@@ -39,10 +37,11 @@ import org.apache.spark.sql.catalyst.plans.logical
3937
import org.apache.spark.sql.catalyst.plans.logical._
4038
import org.apache.spark.sql.catalyst.rules._
4139
import org.apache.spark.sql.execution.datasources
42-
import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, LogicalRelation}
40+
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
4341
import org.apache.spark.sql.hive.client._
44-
import org.apache.spark.sql.parquet.ParquetRelation2
42+
import org.apache.spark.sql.parquet.ParquetRelation
4543
import org.apache.spark.sql.types._
44+
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
4645

4746

4847
private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
@@ -260,8 +259,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
260259
// serialize the Metastore schema to JSON and pass it as a data source option because of the
261260
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
262261
val parquetOptions = Map(
263-
ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
264-
ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
262+
ParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
263+
ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
265264
val tableIdentifier =
266265
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
267266

@@ -272,7 +271,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
272271
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
273272
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
274273
case null => None // Cache miss
275-
case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
274+
case logical@LogicalRelation(parquetRelation: ParquetRelation) =>
276275
// If we have the same paths, same schema, and same partition spec,
277276
// we will use the cached Parquet Relation.
278277
val useCached =
@@ -317,7 +316,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
317316
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
318317
val parquetRelation = cached.getOrElse {
319318
val created = LogicalRelation(
320-
new ParquetRelation2(
319+
new ParquetRelation(
321320
paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
322321
cachedDataSourceTables.put(tableIdentifier, created)
323322
created
@@ -330,7 +329,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
330329
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
331330
val parquetRelation = cached.getOrElse {
332331
val created = LogicalRelation(
333-
new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
332+
new ParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
334333
cachedDataSourceTables.put(tableIdentifier, created)
335334
created
336335
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ import java.io.File
2121

2222
import scala.collection.mutable.ArrayBuffer
2323

24-
import org.scalatest.BeforeAndAfterAll
25-
2624
import org.apache.hadoop.fs.Path
2725
import org.apache.hadoop.mapred.InvalidInputException
26+
import org.scalatest.BeforeAndAfterAll
2827

2928
import org.apache.spark.Logging
3029
import org.apache.spark.sql._
@@ -33,7 +32,7 @@ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
3332
import org.apache.spark.sql.hive.test.TestHive
3433
import org.apache.spark.sql.hive.test.TestHive._
3534
import org.apache.spark.sql.hive.test.TestHive.implicits._
36-
import org.apache.spark.sql.parquet.ParquetRelation2
35+
import org.apache.spark.sql.parquet.ParquetRelation
3736
import org.apache.spark.sql.test.SQLTestUtils
3837
import org.apache.spark.sql.types._
3938
import org.apache.spark.util.Utils
@@ -579,9 +578,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
579578
Row(3) :: Row(4) :: Nil)
580579

581580
table("test_parquet_ctas").queryExecution.optimizedPlan match {
582-
case LogicalRelation(p: ParquetRelation2) => // OK
581+
case LogicalRelation(p: ParquetRelation) => // OK
583582
case _ =>
584-
fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
583+
fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}")
585584
}
586585
}
587586
}

0 commit comments

Comments
 (0)