Skip to content

Commit dd4f088

Browse files
dilipbiswalcloud-fan
authored andcommitted
[SPARK-18009][SQL] Fix ClassCastException while calling toLocalIterator() on dataframe produced by RunnableCommand
## What changes were proposed in this pull request? A short code snippet that uses toLocalIterator() on a dataframe produced by a RunnableCommand reproduces the problem. toLocalIterator() is called by thriftserver when `spark.sql.thriftServer.incrementalCollect`is set to handle queries producing large result set. **Before** ```SQL scala> spark.sql("show databases") res0: org.apache.spark.sql.DataFrame = [databaseName: string] scala> res0.toLocalIterator() 16/10/26 03:00:24 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow ``` **After** ```SQL scala> spark.sql("drop database databases") res30: org.apache.spark.sql.DataFrame = [] scala> spark.sql("show databases") res31: org.apache.spark.sql.DataFrame = [databaseName: string] scala> res31.toLocalIterator().asScala foreach println [default] [parquet] ``` ## How was this patch tested? Added a test in DDLSuite Author: Dilip Biswal <[email protected]> Closes #15642 from dilipbiswal/SPARK-18009.
1 parent f1aeed8 commit dd4f088

File tree

2 files changed

+9
-0
lines changed

2 files changed

+9
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan {
6666

6767
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
6868

69+
override def executeToIterator: Iterator[InternalRow] = sideEffectResult.toIterator
70+
6971
override def executeTake(limit: Int): Array[InternalRow] = sideEffectResult.take(limit).toArray
7072

7173
protected override def doExecute(): RDD[InternalRow] = {

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1805,4 +1805,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18051805
}
18061806
}
18071807
}
1808+
1809+
test("SPARK-18009 calling toLocalIterator on commands") {
1810+
import scala.collection.JavaConverters._
1811+
val df = sql("show databases")
1812+
val rows: Seq[Row] = df.toLocalIterator().asScala.toSeq
1813+
assert(rows.length > 0)
1814+
}
18081815
}

0 commit comments

Comments
 (0)