Skip to content

Commit 13f8cfd

Browse files
committed
[SPARK-2135][SQL] Use planner for in-memory scans
Author: Michael Armbrust <[email protected]> Closes apache#1072 from marmbrus/cachedStars and squashes the following commits: 8757c8e [Michael Armbrust] Use planner for in-memory scans.
1 parent f95ac68 commit 13f8cfd

File tree

10 files changed

+75
-35
lines changed

10 files changed

+75
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
3434
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
3535
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3636

37-
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
37+
import org.apache.spark.sql.columnar.InMemoryRelation
3838

3939
import org.apache.spark.sql.execution._
4040
import org.apache.spark.sql.execution.SparkStrategies
@@ -166,22 +166,21 @@ class SQLContext(@transient val sparkContext: SparkContext)
166166
val useCompression =
167167
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
168168
val asInMemoryRelation =
169-
InMemoryColumnarTableScan(
170-
currentTable.output, executePlan(currentTable).executedPlan, useCompression)
169+
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
171170

172-
catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
171+
catalog.registerTable(None, tableName, asInMemoryRelation)
173172
}
174173

175174
/** Removes the specified table from the in-memory cache. */
176175
def uncacheTable(tableName: String): Unit = {
177176
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
178177
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
179178
// we reregister the RDD as a table.
180-
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd, _)) =>
179+
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
181180
inMem.cachedColumnBuffers.unpersist()
182181
catalog.unregisterTable(None, tableName)
183182
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
184-
case SparkLogicalPlan(inMem: InMemoryColumnarTableScan) =>
183+
case inMem: InMemoryRelation =>
185184
inMem.cachedColumnBuffers.unpersist()
186185
catalog.unregisterTable(None, tableName)
187186
case plan => throw new IllegalArgumentException(s"Table $tableName is not cached: $plan")
@@ -192,7 +191,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
192191
def isCached(tableName: String): Boolean = {
193192
val relation = catalog.lookupRelation(None, tableName)
194193
EliminateAnalysisOperators(relation) match {
195-
case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
194+
case _: InMemoryRelation => true
196195
case _ => false
197196
}
198197
}
@@ -208,6 +207,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
208207
PartialAggregation ::
209208
LeftSemiJoin ::
210209
HashJoin ::
210+
InMemoryScans ::
211211
ParquetOperations ::
212212
BasicOperators ::
213213
CartesianProduct ::

sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,29 @@
1717

1818
package org.apache.spark.sql.columnar
1919

20+
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2021
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
22+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2123
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
2224
import org.apache.spark.sql.Row
2325
import org.apache.spark.SparkConf
2426

25-
private[sql] case class InMemoryColumnarTableScan(
26-
attributes: Seq[Attribute],
27-
child: SparkPlan,
28-
useCompression: Boolean)
29-
extends LeafNode {
27+
object InMemoryRelation {
28+
def apply(useCompression: Boolean, child: SparkPlan): InMemoryRelation =
29+
new InMemoryRelation(child.output, useCompression, child)
30+
}
3031

31-
override def output: Seq[Attribute] = attributes
32+
private[sql] case class InMemoryRelation(
33+
output: Seq[Attribute],
34+
useCompression: Boolean,
35+
child: SparkPlan)
36+
extends LogicalPlan with MultiInstanceRelation {
37+
38+
override def children = Seq.empty
39+
override def references = Set.empty
40+
41+
override def newInstance() =
42+
new InMemoryRelation(output.map(_.newInstance), useCompression, child).asInstanceOf[this.type]
3243

3344
lazy val cachedColumnBuffers = {
3445
val output = child.output
@@ -55,14 +66,26 @@ private[sql] case class InMemoryColumnarTableScan(
5566
cached.count()
5667
cached
5768
}
69+
}
70+
71+
private[sql] case class InMemoryColumnarTableScan(
72+
attributes: Seq[Attribute],
73+
relation: InMemoryRelation)
74+
extends LeafNode {
75+
76+
override def output: Seq[Attribute] = attributes
5877

5978
override def execute() = {
60-
cachedColumnBuffers.mapPartitions { iterator =>
79+
relation.cachedColumnBuffers.mapPartitions { iterator =>
6180
val columnBuffers = iterator.next()
6281
assert(!iterator.hasNext)
6382

6483
new Iterator[Row] {
65-
val columnAccessors = columnBuffers.map(ColumnAccessor(_))
84+
// Find the ordinals of the requested columns. If none are requested, use the first.
85+
val requestedColumns =
86+
if (attributes.isEmpty) Seq(0) else attributes.map(relation.output.indexOf(_))
87+
88+
val columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
6689
val nextRow = new GenericMutableRow(columnAccessors.length)
6790

6891
override def next() = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
7777
SparkLogicalPlan(
7878
alreadyPlanned match {
7979
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
80-
case scan @ InMemoryColumnarTableScan(output, _, _) =>
81-
scan.copy(attributes = output.map(_.newInstance))
8280
case _ => sys.error("Multiple instance of the same relation detected.")
8381
}).asInstanceOf[this.type]
8482
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
2424
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2525
import org.apache.spark.sql.catalyst.plans.physical._
2626
import org.apache.spark.sql.parquet._
27+
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
2728

2829
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
2930
self: SQLContext#SparkPlanner =>
@@ -191,6 +192,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
191192
}
192193
}
193194

195+
object InMemoryScans extends Strategy {
196+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
197+
case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>
198+
pruneFilterProject(
199+
projectList,
200+
filters,
201+
identity[Seq[Expression]], // No filters are pushed down.
202+
InMemoryColumnarTableScan(_, mem)) :: Nil
203+
case _ => Nil
204+
}
205+
}
206+
194207
// Can we automate these 'pass through' operations?
195208
object BasicOperators extends Strategy {
196209
def numPartitions = self.numPartitions

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
package org.apache.spark.sql
1919

2020
import org.apache.spark.sql.TestData._
21-
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
22-
import org.apache.spark.sql.execution.SparkLogicalPlan
21+
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
2322
import org.apache.spark.sql.test.TestSQLContext
2423

2524
class CachedTableSuite extends QueryTest {
@@ -34,7 +33,7 @@ class CachedTableSuite extends QueryTest {
3433
)
3534

3635
TestSQLContext.table("testData").queryExecution.analyzed match {
37-
case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
36+
case _ : InMemoryRelation => // Found evidence of caching
3837
case noCache => fail(s"No cache node found in plan $noCache")
3938
}
4039

@@ -46,7 +45,7 @@ class CachedTableSuite extends QueryTest {
4645
)
4746

4847
TestSQLContext.table("testData").queryExecution.analyzed match {
49-
case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
48+
case cachePlan: InMemoryRelation =>
5049
fail(s"Table still cached after uncache: $cachePlan")
5150
case noCache => // Table uncached successfully
5251
}
@@ -61,13 +60,17 @@ class CachedTableSuite extends QueryTest {
6160
test("SELECT Star Cached Table") {
6261
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
6362
TestSQLContext.cacheTable("selectStar")
64-
TestSQLContext.sql("SELECT * FROM selectStar")
63+
TestSQLContext.sql("SELECT * FROM selectStar WHERE key = 1").collect()
6564
TestSQLContext.uncacheTable("selectStar")
6665
}
6766

6867
test("Self-join cached") {
68+
val unCachedAnswer =
69+
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect()
6970
TestSQLContext.cacheTable("testData")
70-
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
71+
checkAnswer(
72+
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key"),
73+
unCachedAnswer.toSeq)
7174
TestSQLContext.uncacheTable("testData")
7275
}
7376

sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
2828

2929
test("simple columnar query") {
3030
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
31-
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
31+
val scan = InMemoryRelation(useCompression = true, plan)
3232

3333
checkAnswer(scan, testData.collect().toSeq)
3434
}
3535

3636
test("projection") {
3737
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
38-
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
38+
val scan = InMemoryRelation(useCompression = true, plan)
3939

4040
checkAnswer(scan, testData.collect().map {
4141
case Row(key: Int, value: String) => value -> key
@@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
4444

4545
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
4646
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
47-
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
47+
val scan = InMemoryRelation(useCompression = true, plan)
4848

4949
checkAnswer(scan, testData.collect().toSeq)
5050
checkAnswer(scan, testData.collect().toSeq)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
230230
CommandStrategy(self),
231231
TakeOrdered,
232232
ParquetOperations,
233+
InMemoryScans,
233234
HiveTableScans,
234235
DataSinks,
235236
Scripts,

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.rules._
3636
import org.apache.spark.sql.catalyst.types._
3737
import org.apache.spark.sql.execution.SparkLogicalPlan
3838
import org.apache.spark.sql.hive.execution.{HiveTableScan, InsertIntoHiveTable}
39-
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
39+
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
4040

4141
/* Implicit conversions */
4242
import scala.collection.JavaConversions._
@@ -130,8 +130,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
130130
case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
131131
castChildOutput(p, table, child)
132132

133-
case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
134-
_, HiveTableScan(_, table, _), _)), _, child, _) =>
133+
case p @ logical.InsertIntoTable(
134+
InMemoryRelation(_, _,
135+
HiveTableScan(_, table, _)), _, child, _) =>
135136
castChildOutput(p, table, child)
136137
}
137138

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans._
2424
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2525
import org.apache.spark.sql.execution._
2626
import org.apache.spark.sql.hive.execution._
27-
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
27+
import org.apache.spark.sql.columnar.InMemoryRelation
2828

2929
private[hive] trait HiveStrategies {
3030
// Possibly being too clever with types here... or not clever enough.
@@ -44,8 +44,9 @@ private[hive] trait HiveStrategies {
4444
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
4545
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
4646
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
47-
case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
48-
_, HiveTableScan(_, table, _), _)), partition, child, overwrite) =>
47+
case logical.InsertIntoTable(
48+
InMemoryRelation(_, _,
49+
HiveTableScan(_, table, _)), partition, child, overwrite) =>
4950
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
5051
case _ => Nil
5152
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.hive
1919

2020
import org.apache.spark.sql.execution.SparkLogicalPlan
21-
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
21+
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
2222
import org.apache.spark.sql.hive.execution.HiveComparisonTest
2323
import org.apache.spark.sql.hive.test.TestHive
2424

@@ -34,7 +34,7 @@ class CachedTableSuite extends HiveComparisonTest {
3434

3535
test("check that table is cached and uncache") {
3636
TestHive.table("src").queryExecution.analyzed match {
37-
case SparkLogicalPlan(_ : InMemoryColumnarTableScan) => // Found evidence of caching
37+
case _ : InMemoryRelation => // Found evidence of caching
3838
case noCache => fail(s"No cache node found in plan $noCache")
3939
}
4040
TestHive.uncacheTable("src")
@@ -45,7 +45,7 @@ class CachedTableSuite extends HiveComparisonTest {
4545

4646
test("make sure table is uncached") {
4747
TestHive.table("src").queryExecution.analyzed match {
48-
case cachePlan @ SparkLogicalPlan(_ : InMemoryColumnarTableScan) =>
48+
case cachePlan: InMemoryRelation =>
4949
fail(s"Table still cached after uncache: $cachePlan")
5050
case noCache => // Table uncached successfully
5151
}

0 commit comments

Comments
 (0)