Skip to content

Commit ca5b825

Browse files
Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it.
1 parent 43d38a6 commit ca5b825

File tree

6 files changed

+21
-17
lines changed

6 files changed

+21
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
8686
* @group userf
8787
*/
8888
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
89-
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
89+
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
9090

9191
/**
9292
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
@@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
127127
*/
128128
@Experimental
129129
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
130-
new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
130+
new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))
131131

132132
/**
133133
* :: Experimental ::
@@ -208,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
208208
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
209209
inMem.cachedColumnBuffers.unpersist()
210210
catalog.unregisterTable(None, tableName)
211-
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
211+
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
212212
case inMem: InMemoryRelation =>
213213
inMem.cachedColumnBuffers.unpersist()
214214
catalog.unregisterTable(None, tableName)
@@ -367,7 +367,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
367367
new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
368368
}
369369
}
370-
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
370+
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
371371
}
372372

373373
}

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,8 @@ class SchemaRDD(
430430
* @group schema
431431
*/
432432
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
433-
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd)))
433+
new SchemaRDD(sqlContext,
434+
SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext))
434435
}
435436

436437
// =======================================================================

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike {
5656
// happen right away to let these side effects take place eagerly.
5757
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
5858
queryExecution.toRdd
59-
SparkLogicalPlan(queryExecution.executedPlan)
59+
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
6060
case _ =>
6161
baseLogicalPlan
6262
}

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
9292
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
9393
}
9494
}
95-
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
95+
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
9696
}
9797

9898
/**
@@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
120120
* @group userf
121121
*/
122122
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
123-
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
123+
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))
124124

125125
/**
126126
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.sql.{Logging, Row, SQLConf}
22+
import org.apache.spark.sql.{Logging, Row, SQLContext}
2323
import org.apache.spark.sql.catalyst.trees
2424
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2525
import org.apache.spark.sql.catalyst.expressions.GenericRow
@@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
6666
* linking.
6767
*/
6868
@DeveloperApi
69-
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
70-
extends LogicalPlan with MultiInstanceRelation with SQLConf {
69+
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
70+
extends LogicalPlan with MultiInstanceRelation {
7171

7272
def output = alreadyPlanned.output
7373
override def references = Set.empty
@@ -78,7 +78,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
7878
alreadyPlanned match {
7979
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
8080
case _ => sys.error("Multiple instance of the same relation detected.")
81-
}).asInstanceOf[this.type]
81+
})(sqlContext).asInstanceOf[this.type]
8282
}
8383

8484
@transient override lazy val statistics = Statistics(
@@ -89,7 +89,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
8989
alreadyPlanned match {
9090
// TODO: Instead of returning a default value here, find a way to return a meaningful
9191
// size estimate for RDDs. See PR 1238 for more discussions.
92-
case e: ExistingRdd if naiveVal == 1L => statsDefaultSizeInBytes
92+
case e: ExistingRdd if naiveVal == 1L => sqlContext.statsDefaultSizeInBytes
9393
case _ => naiveVal
9494
}
9595
}

sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,30 @@ import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.plans.logical._
2929
import org.apache.spark.sql.catalyst.types._
3030
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
31-
import org.apache.spark.sql.Logging
31+
import org.apache.spark.sql.{SQLContext, Logging}
3232

3333
private[sql] object JsonRDD extends Logging {
3434

3535
private[sql] def inferSchema(
36+
sqlContext: SQLContext,
3637
json: RDD[String],
3738
samplingRatio: Double = 1.0): LogicalPlan = {
3839
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
3940
val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
4041
val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
4142
val baseSchema = createSchema(allKeys)
4243

43-
createLogicalPlan(json, baseSchema)
44+
createLogicalPlan(json, baseSchema, sqlContext)
4445
}
4546

4647
private def createLogicalPlan(
4748
json: RDD[String],
48-
baseSchema: StructType): LogicalPlan = {
49+
baseSchema: StructType,
50+
sqlContext: SQLContext): LogicalPlan = {
4951
val schema = nullTypeToStringType(baseSchema)
5052

51-
SparkLogicalPlan(ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))
53+
SparkLogicalPlan(
54+
ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))(sqlContext)
5255
}
5356

5457
private def createSchema(allKeys: Set[(String, DataType)]): StructType = {

0 commit comments

Comments
 (0)