Skip to content

Commit 596c312

Browse files
committed
Uses switch to control whether use Parquet data source or not
1 parent 7d0f7a2 commit 596c312

File tree

12 files changed

+589
-542
lines changed

12 files changed

+589
-542
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private[sql] class SQLConf extends Serializable {
107107
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
108108

109109
/** When true uses Parquet implementation based on data source API */
110-
private[spark] def parquetUseDataSourceApi=
110+
private[spark] def parquetUseDataSourceApi =
111111
getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
112112

113113
/** When true the planner will use the external sort, which may spill to disk. */

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@ package org.apache.spark.sql
2020
import java.beans.Introspector
2121
import java.util.Properties
2222

23-
import scala.collection.immutable
2423
import scala.collection.JavaConversions._
24+
import scala.collection.immutable
2525
import scala.language.implicitConversions
2626
import scala.reflect.runtime.universe.TypeTag
2727

28-
import org.apache.spark.{SparkContext, Partition}
2928
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
30-
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
29+
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3130
import org.apache.spark.rdd.RDD
3231
import org.apache.spark.sql.catalyst.ScalaReflection
3332
import org.apache.spark.sql.catalyst.analysis._
@@ -36,11 +35,12 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
3635
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
3736
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3837
import org.apache.spark.sql.execution._
39-
import org.apache.spark.sql.json._
4038
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
41-
import org.apache.spark.sql.sources._
39+
import org.apache.spark.sql.json._
40+
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation, _}
4241
import org.apache.spark.sql.types._
4342
import org.apache.spark.util.Utils
43+
import org.apache.spark.{Partition, SparkContext}
4444

4545
/**
4646
* :: AlphaComponent ::
@@ -305,7 +305,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
305305
*/
306306
@scala.annotation.varargs
307307
def parquetFile(paths: String*): DataFrame =
308-
baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
308+
if (conf.parquetUseDataSourceApi) {
309+
baseRelationToDataFrame(parquet.ParquetRelation2(paths, Map.empty)(this))
310+
} else {
311+
new DataFrame(this, parquet.ParquetRelation(
312+
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
313+
}
309314

310315
/**
311316
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,17 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import org.apache.spark.sql.{SQLContext, Strategy, execution}
2120
import org.apache.spark.sql.catalyst.expressions._
2221
import org.apache.spark.sql.catalyst.planning._
2322
import org.apache.spark.sql.catalyst.plans._
24-
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2524
import org.apache.spark.sql.catalyst.plans.physical._
2625
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
26+
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
2727
import org.apache.spark.sql.parquet._
28+
import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
2829
import org.apache.spark.sql.types._
29-
import org.apache.spark.sql.sources.{DescribeCommand => LogicalDescribeCommand}
30-
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
31-
import org.apache.spark.sql.sources._
30+
import org.apache.spark.sql.{SQLContext, Strategy, execution}
3231

3332
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3433
self: SQLContext#SparkPlanner =>

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ trait ParquetTest {
9090
(f: String => Unit): Unit = {
9191
import sqlContext.implicits._
9292
withTempPath { file =>
93-
sparkContext.parallelize(data).saveAsParquetFile(file.getCanonicalPath)
93+
sparkContext.parallelize(data, 3).saveAsParquetFile(file.getCanonicalPath)
9494
f(file.getCanonicalPath)
9595
}
9696
}

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala

Lines changed: 182 additions & 172 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)