From 192a25c1f0ade44a59d01dd4220f0022a8f18c43 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 1 Jun 2014 21:41:55 +0800 Subject: [PATCH 1/3] [SPARK-1958] Calling .collect() on a SchemaRDD should call executeCollect() on the underlying query plan. --- .../src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index e855f36256bc..2b51f9d86109 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -368,6 +368,12 @@ class SchemaRDD( new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd))) } + // ======================================================================= + // Overriden RDD actions + // ======================================================================= + + override def collect() = queryExecution.executedPlan.executeCollect() + // ======================================================================= // Base RDD functions that do NOT change schema // ======================================================================= From 825097680db1279996241a7544e0f07b2ce60923 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 2 Jun 2014 17:25:38 +0800 Subject: [PATCH 2/3] Added return type explicitly for public API --- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 2b51f9d86109..8855c4e87691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -372,7 +372,7 @@ class SchemaRDD( // Overriden RDD actions // ======================================================================= - override def collect() = queryExecution.executedPlan.executeCollect() + override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() // ======================================================================= // Base RDD functions that do NOT change schema From bdc4a145e5bcfc372985600bddafa82a63a51ca1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 2 Jun 2014 23:26:49 +0800 Subject: [PATCH 3/3] Copy rows to present immutable data to users --- .../main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 2 +- .../scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 235a9b169246..4613df103943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { /** * Runs this query returning the result as an array. */ - def executeCollect(): Array[Row] = execute().collect() + def executeCollect(): Array[Row] = execute().map(_.copy()).collect() protected def buildRow(values: Seq[Any]): Row = new GenericRow(values.toArray) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index f9731e82e492..b973ceba5fec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -201,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { } test("insert (appending) to same table via Scala API") { - sql("INSERT INTO testsource SELECT * FROM testsource").collect() + sql("INSERT INTO testsource SELECT * FROM testsource") val double_rdd = sql("SELECT * FROM testsource").collect() assert(double_rdd != null) assert(double_rdd.size === 30)