From d6c25dc3eac1f63eaf3d1ef7cfd1e9a60c6ff79a Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Tue, 28 Oct 2014 01:07:33 -0700 Subject: [PATCH 1/2] update strategy to use the newly added bulkload logical plan --- .../scala/org/apache/spark/sql/hbase/HBaseStrategies.scala | 2 +- .../apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala index 68301923e258f..4d4d4ca22f83a 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala @@ -95,7 +95,7 @@ private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] { tableName, nameSpace, hbaseTableName, colsSeq, keyCols, nonKeyCols) (hbaseSQLContext)) - case logical.BulkLoadIntoTable(table: HBaseRelation, path) => + case logical.LoadDataIntoTable(path, table: HBaseRelation, isLocal) => execution.BulkLoadIntoTable(table, path)(hbaseSQLContext) :: Nil case InsertIntoTable(table: HBaseRelation, partition, child, _) => new InsertIntoHBaseTable(table, planLater(child))(hbaseSQLContext) :: Nil diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala index 8aa04ced45e1f..4100eb9a8940a 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala @@ -29,13 +29,6 @@ case class CreateHBaseTablePlan(tableName: String, case class DropTablePlan(tableName: String) extends Command - -case class BulkLoadIntoTable(table: HBaseRelation, path: String) extends LeafNode { - override def output = Seq.empty - // TODO:need resolved here? - -} - case class LoadDataIntoTable(path: String, table: String, isLocal: Boolean) extends LeafNode { override def output = Seq.empty } From 2a150c7bd20b2bdf7fa00c885059b3aea121869b Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Tue, 28 Oct 2014 01:17:31 -0700 Subject: [PATCH 2/2] preserving isLocal for checking in execution --- .../main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala | 2 +- .../org/apache/spark/sql/hbase/execution/HBaseOperators.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala index 4d4d4ca22f83a..3b64b94ebef9f 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala @@ -96,7 +96,7 @@ private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] { colsSeq, keyCols, nonKeyCols) (hbaseSQLContext)) case logical.LoadDataIntoTable(path, table: HBaseRelation, isLocal) => - execution.BulkLoadIntoTable(table, path)(hbaseSQLContext) :: Nil + execution.BulkLoadIntoTable(path, table, isLocal)(hbaseSQLContext) :: Nil case InsertIntoTable(table: HBaseRelation, partition, child, _) => new InsertIntoHBaseTable(table, planLater(child))(hbaseSQLContext) :: Nil case logical.DropTablePlan(tableName) => Seq(execution.DropHbaseTableCommand(tableName)(hbaseSQLContext)) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala index a9a4a5f264939..26d704f15ef48 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala @@ -74,7 +74,7 @@ case class InsertIntoHBaseTable( } @DeveloperApi -case class BulkLoadIntoTable(relation: HBaseRelation, path: String)( +case class BulkLoadIntoTable(path: String, relation: HBaseRelation, isLocal: Boolean)( @transient hbContext: HBaseSQLContext) extends LeafNode { val jobConf = new JobConf(hbContext.sc.hadoopConfiguration)