Skip to content

Commit 8b16630

Browse files
committed
another fix
1 parent 326c82c commit 8b16630

File tree

4 files changed

+11
-14
lines changed

4 files changed

+11
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1418,12 +1418,14 @@ class DataFrame private[sql](
14181418
lazy val rdd: RDD[Row] = {
14191419
// use a local variable to make sure the map closure doesn't capture the whole DataFrame
14201420
val schema = this.schema
1421-
queryExecution.executedPlan.execute().mapPartitions { rows =>
1421+
internalRowRdd.mapPartitions { rows =>
14221422
val converter = CatalystTypeConverters.createToScalaConverter(schema)
14231423
rows.map(converter(_).asInstanceOf[Row])
14241424
}
14251425
}
14261426

1427+
private[sql] def internalRowRdd = queryExecution.executedPlan.execute()
1428+
14271429
/**
14281430
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
14291431
* @group rdd

sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import scala.collection.mutable.{Map => MutableMap}
2222
import org.apache.spark.Logging
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
25-
import org.apache.spark.sql.catalyst.CatalystTypeConverters
2625
import org.apache.spark.sql.types.{ArrayType, StructField, StructType}
2726
import org.apache.spark.sql.{Column, DataFrame}
2827

@@ -91,7 +90,7 @@ private[sql] object FrequentItems extends Logging {
9190
(name, originalSchema.fields(index).dataType)
9291
}
9392

94-
val freqItems = df.select(cols.map(Column(_)) : _*).rdd.aggregate(countMaps)(
93+
val freqItems = df.select(cols.map(Column(_)) : _*).internalRowRdd.aggregate(countMaps)(
9594
seqOp = (counts, row) => {
9695
var i = 0
9796
while (i < numCols) {
@@ -111,17 +110,13 @@ private[sql] object FrequentItems extends Logging {
111110
baseCounts
112111
}
113112
)
114-
113+
val justItems = freqItems.map(m => m.baseMap.keys.toSeq)
114+
val resultRow = InternalRow(justItems : _*)
115115
// append frequent Items to the column name for easy debugging
116116
val outputCols = colInfo.map { v =>
117117
StructField(v._1 + "_freqItems", ArrayType(v._2, false))
118118
}
119-
val schema = StructType(outputCols)
120-
121-
val converter = CatalystTypeConverters.createToCatalystConverter(schema)
122-
val justItems = freqItems.map(m => m.baseMap.keys.toSeq)
123-
val resultRow = converter(InternalRow(justItems : _*)).asInstanceOf[InternalRow]
124-
125-
new DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, Seq(resultRow)))
119+
val schema = StructType(outputCols).toAttributes
120+
new DataFrame(df.sqlContext, LocalRelation(schema, Seq(resultRow)))
126121
}
127122
}

sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private[sql] object StatFunctions extends Logging {
8181
s"with dataType ${data.get.dataType} not supported.")
8282
}
8383
val columns = cols.map(n => Column(Cast(Column(n).expr, DoubleType)))
84-
df.select(columns: _*).rdd.aggregate(new CovarianceCounter)(
84+
df.select(columns: _*).internalRowRdd.aggregate(new CovarianceCounter)(
8585
seqOp = (counter, row) => {
8686
counter.add(row.getDouble(0), row.getDouble(1))
8787
},

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
154154
writerContainer.driverSideSetup()
155155

156156
try {
157-
df.sqlContext.sparkContext.runJob(df.queryExecution.executedPlan.execute(), writeRows _)
157+
df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
158158
writerContainer.commitJob()
159159
relation.refresh()
160160
} catch { case cause: Throwable =>
@@ -220,7 +220,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
220220
writerContainer.driverSideSetup()
221221

222222
try {
223-
df.sqlContext.sparkContext.runJob(df.queryExecution.executedPlan.execute(), writeRows _)
223+
df.sqlContext.sparkContext.runJob(df.internalRowRdd, writeRows _)
224224
writerContainer.commitJob()
225225
relation.refresh()
226226
} catch { case cause: Throwable =>

0 commit comments

Comments
 (0)