Skip to content

Commit 76de22b

Browse files
committed
Allow schema calculation to be lazy, but ensure its available on executors.
1 parent 13102ee commit 76de22b

File tree

2 files changed

+11
-12
lines changed

2 files changed

+11
-12
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,18 @@ class SchemaRDD(
117117

118118
override def getPartitions: Array[Partition] = firstParent[Row].partitions
119119

120-
override protected def getDependencies: Seq[Dependency[_]] =
120+
override protected def getDependencies: Seq[Dependency[_]] = {
121+
schema // Force reification of the schema so it is available on executors.
122+
121123
List(new OneToOneDependency(queryExecution.toRdd))
124+
}
122125

123-
/** Returns the schema of this SchemaRDD (represented by a [[StructType]]).
124-
*
125-
* @group schema
126-
*/
127-
val schema: StructType = queryExecution.analyzed.schema
126+
/**
127+
* Returns the schema of this SchemaRDD (represented by a [[StructType]]).
128+
*
129+
* @group schema
130+
*/
131+
lazy val schema: StructType = queryExecution.analyzed.schema
128132

129133
// =======================================================================
130134
// Query DSL

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -355,11 +355,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
355355
/** Extends QueryExecution with hive specific features. */
356356
protected[sql] abstract class QueryExecution extends super.QueryExecution {
357357

358-
override lazy val toRdd: RDD[Row] = {
359-
val schema = StructType.fromAttributes(logical.output)
360-
executedPlan.execute().map(ScalaReflection.convertRowToScala(_, schema))
361-
}
362-
363358
protected val primitiveTypes =
364359
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
365360
ShortType, DecimalType, DateType, TimestampType, BinaryType)
@@ -414,7 +409,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
414409
command.executeCollect().map(_.head.toString)
415410

416411
case other =>
417-
val result: Seq[Seq[Any]] = toRdd.collect().toSeq
412+
val result: Seq[Seq[Any]] = toRdd.map(_.copy()).collect().toSeq
418413
// We need the types so we can output struct field names
419414
val types = analyzed.output.map(_.dataType)
420415
// Reformat to match hive tab delimited output.

0 commit comments

Comments
 (0)