Skip to content

Commit 1d00937

Browse files
committed
Makes SchemaRDD DSLs work for DDL/command statement RDDs
1 parent 5c7e680 commit 1d00937

File tree

3 files changed

+8
-5
lines changed

3 files changed

+8
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ import java.util.{Map => JMap}
9797
@AlphaComponent
9898
class SchemaRDD(
9999
@transient val sqlContext: SQLContext,
100-
@transient protected[spark] val logicalPlan: LogicalPlan)
100+
@transient val baseLogicalPlan: LogicalPlan)
101101
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
102102

103103
def baseSchemaRDD = this

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ package org.apache.spark.sql
2020
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2121
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2222
import org.apache.spark.sql.catalyst.plans.logical._
23+
import org.apache.spark.sql.execution.SparkLogicalPlan
2324

2425
/**
2526
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
2627
*/
2728
private[sql] trait SchemaRDDLike {
2829
@transient val sqlContext: SQLContext
29-
@transient protected[spark] val logicalPlan: LogicalPlan
30+
@transient val baseLogicalPlan: LogicalPlan
3031

3132
private[sql] def baseSchemaRDD: SchemaRDD
3233

@@ -48,14 +49,16 @@ private[sql] trait SchemaRDDLike {
4849
*/
4950
@transient
5051
@DeveloperApi
51-
lazy val queryExecution = sqlContext.executePlan(logicalPlan)
52+
lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
5253

53-
logicalPlan match {
54+
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
5455
// For various commands (like DDL) and queries with side effects, we force query optimization to
5556
// happen right away to let these side effects take place eagerly.
5657
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
5758
queryExecution.toRdd
59+
SparkLogicalPlan(queryExecution.executedPlan)
5860
case _ =>
61+
baseLogicalPlan
5962
}
6063

6164
override def toString =

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
3737
*/
3838
class JavaSchemaRDD(
3939
@transient val sqlContext: SQLContext,
40-
@transient protected[spark] val logicalPlan: LogicalPlan)
40+
@transient val baseLogicalPlan: LogicalPlan)
4141
extends JavaRDDLike[Row, JavaRDD[Row]]
4242
with SchemaRDDLike {
4343

0 commit comments

Comments
 (0)