From 8ecfad2fd4aebda789df97a6c4111207030c057a Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 27 Oct 2014 14:22:46 -0700 Subject: [PATCH 01/10] draft supoort bulkload --- .../spark/sql/hbase/HBasePartitioner.scala | 144 ++++++++++++++++++ .../spark/sql/hbase/HBaseSQLParser.scala | 2 +- .../spark/sql/hbase/HBaseStrategies.scala | 8 +- .../apache/spark/sql/hbase/HadoopReader.scala | 80 ++++++++++ .../sql/hbase/execution/HBaseOperators.scala | 44 +++++- .../sql/hbase/logical/HBaseLogicalPlans.scala | 14 +- .../sql/hbase/HBasePartitionerSuite.scala | 33 ++++ 7 files changed, 318 insertions(+), 7 deletions(-) create mode 100644 sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala create mode 100644 sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala create mode 100644 sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala new file mode 100644 index 0000000000000..e833cbfa58464 --- /dev/null +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} +import scala.Array +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkEnv +import org.apache.spark.Partitioner +import org.apache.spark.util.{Utils, CollectionsUtils} +import org.apache.spark.serializer.JavaSerializer +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.HTable + +class HBasePartitioner [K : Ordering : ClassTag, V]( + @transient rdd: RDD[_ <: Product2[K,V]])(splitKeys: Array[K]) + extends Partitioner { + + private var ordering = implicitly[Ordering[K]] + + private var rangeBounds: Array[K] = splitKeys + + def numPartitions = rangeBounds.length + 1 + + private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] + + // todo: test this + def getPartition(key: Any): Int = { + val k = key.asInstanceOf[K] + var partition = 0 + if (rangeBounds.length <= 128) { + // If we have less than 128 partitions naive search + while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { + partition += 1 + } + } else { + // Determine which binary search method to use only once. + partition = binarySearch(rangeBounds, k) + // binarySearch either returns the match location or -[insertion point]-1 + if (partition < 0) { + partition = -partition-1 + } + if (partition > rangeBounds.length) { + partition = rangeBounds.length + } + } + partition + } + + override def equals(other: Any): Boolean = other match { + case r: HBasePartitioner[_,_] => + r.rangeBounds.sameElements(rangeBounds) + case _ => + false + } + + override def hashCode(): Int = { + val prime = 31 + var result = 1 + var i = 0 + while (i < rangeBounds.length) { + result = prime * result + rangeBounds(i).hashCode + i += 1 + } + result = prime * result + result + } + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream) { + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => out.defaultWriteObject() + case _ => + out.writeObject(ordering) + out.writeObject(binarySearch) + + val ser = sfactory.newInstance() + Utils.serializeViaNestedStream(out, ser) { stream => + stream.writeObject(scala.reflect.classTag[Array[K]]) + stream.writeObject(rangeBounds) + } + } + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream) { + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => in.defaultReadObject() + case _ => + ordering = in.readObject().asInstanceOf[Ordering[K]] + binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int] + + val ser = sfactory.newInstance() + Utils.deserializeViaNestedStream(in, ser) { ds => + implicit val classTag = ds.readObject[ClassTag[Array[K]]]() + rangeBounds = ds.readObject[Array[K]]() + } + } + } +} + +//Todo: test this +object HBasePartitioner { + implicit def orderingRowKey[ImmutableBytesWritable]: Ordering[ImmutableBytesWritable] = + OrderingRowKey.asInstanceOf[Ordering[ImmutableBytesWritable]] + + /** + * Return the start keys of all of the regions in this table, + * as a list of ImmutableBytesWritable. + * Todo: should move to hbase relation after code clean + */ + def getRegionStartKeys(table: HTable) = { + val byteKeys: Array[Array[Byte]] = table.getStartKeys + val ret = ArrayBuffer[ImmutableBytesWritable]() + for (byteKey <- byteKeys) { + ret += new ImmutableBytesWritable(byteKey) + } + ret + } +} + +object OrderingRowKey extends Ordering[ImmutableBytesWritable] { + def compare(a: ImmutableBytesWritable, b: ImmutableBytesWritable) = a.compareTo(b) +} \ No newline at end of file diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 23ae0970e20b3..4025296f05e34 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser} import org.apache.spark.sql.catalyst.SparkSQLParser -import org.apache.spark.sql.hbase.logical.{DropTablePlan, CreateHBaseTablePlan} +import org.apache.spark.sql.hbase.logical.{CreateHBaseTablePlan, DropTablePlan} class HBaseSQLParser extends SqlParser { protected val BULK = Keyword("BULK") diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala index 576b580fab557..68301923e258f 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution._ import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hbase.execution._ +import org.apache.spark.sql.hbase.execution.{HBaseSQLTableScan, InsertIntoHBaseTable} /** * HBaseStrategies @@ -91,13 +91,15 @@ private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] { case logical.CreateHBaseTablePlan( tableName, nameSpace, hbaseTableName, colsSeq, keyCols, nonKeyCols) => - Seq(CreateHBaseTableCommand( + Seq(execution.CreateHBaseTableCommand( tableName, nameSpace, hbaseTableName, colsSeq, keyCols, nonKeyCols) (hbaseSQLContext)) + case logical.BulkLoadIntoTable(table: HBaseRelation, path) => + execution.BulkLoadIntoTable(table, path)(hbaseSQLContext) :: Nil case InsertIntoTable(table: HBaseRelation, partition, child, _) => new InsertIntoHBaseTable(table, planLater(child))(hbaseSQLContext) :: Nil - case logical.DropTablePlan(tableName) => Seq(DropHbaseTableCommand(tableName)(hbaseSQLContext)) + case logical.DropTablePlan(tableName) => Seq(execution.DropHbaseTableCommand(tableName)(hbaseSQLContext)) case _ => Nil } } diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala new file mode 100644 index 0000000000000..aee77ae485a33 --- /dev/null +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.rdd.HadoopRDD +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{SparkContext, SerializableWritable} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.KeyValue + +/** + * Helper class for scanning files stored in Hadoop - e.g., to read text file when bulk loading. + */ +private[hbase] +class HadoopReader( + @transient sc: SparkContext, + @transient jobConf: JobConf) { + + private val broadcastedHiveConf = + sc.broadcast(new SerializableWritable(jobConf)) + + private val minSplitsPerRDD = + sc.getConf.get("spark.sql.hbase.minPartitions", sc.defaultMinPartitions.toString).toInt + + private val splitRegex = sc.getConf.get("spark.sql.hbase.splitRegex", ",") + + val inputFormatClass = classOf[TextInputFormat] + + val rowKeyIds = "0,1,2" + + def makeBulkLoadRDD = { + val rdd = new HadoopRDD( + sc, + broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + None, + inputFormatClass, + classOf[LongWritable], + classOf[Text], + minSplitsPerRDD) + + // Todo: use mapPartitions more better, now just simply code + // Only take the value (skip the key) because Hbase works only with values. + rdd.map { value => + // Todo: needs info which field is rowkey and value from HbaseRelation here + val fields = value._2.toString.split(splitRegex) + val rowKey = Bytes.toBytes(fields(0)) + val rowKeyWritable = new ImmutableBytesWritable(rowKey) + val family = Bytes.toBytes("cf") + val qualifier = Bytes.toBytes("count") + val hbaseValue = Bytes.toBytes(fields(1)) +// val keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue) + // we should use Put?, which is better? keyvalue is for one column in column family, right? + val put = new Put(rowKey) + put.add(family, qualifier, hbaseValue) + (rowKeyWritable, put) + } + } + +} \ No newline at end of file diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala index 6473d6e0c5619..a9a4a5f264939 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala @@ -18,10 +18,15 @@ package org.apache.spark.sql.hbase.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{ShuffledRDD, RDD} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{LeafNode, UnaryNode, SparkPlan} -import org.apache.spark.sql.hbase.{HBaseSQLReaderRDD, HBaseSQLContext, HBaseRelation} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.SerializableWritable +import org.apache.spark.sql.hbase._ +import org.apache.spark.sql.hbase.HBaseRelation /** * :: DeveloperApi :: @@ -67,3 +72,38 @@ case class InsertIntoHBaseTable( override def output = child.output } + +@DeveloperApi +case class BulkLoadIntoTable(relation: HBaseRelation, path: String)( + @transient hbContext: HBaseSQLContext) extends LeafNode { + + val jobConf = new JobConf(hbContext.sc.hadoopConfiguration) + + val hadoopReader = new HadoopReader(hbContext.sparkContext, jobConf) + + // TODO: get from raltion (or config) + val splitKeys = ??? + + override def execute() = { + implicit val ordering = HBasePartitioner.orderingRowKey + .asInstanceOf[Ordering[ImmutableBytesWritable]] + val rdd = hadoopReader.makeBulkLoadRDD + val partitioner = new HBasePartitioner(rdd)(splitKeys) + val shuffled = + new ShuffledRDD[ImmutableBytesWritable, Put, Put](rdd, partitioner).setKeyOrdering(ordering) + + val jobConfSer = new SerializableWritable(jobConf) + saveAsHFile(shuffled, jobConfSer) + // bulk load is like a command, so return null is ok here + null + } + + def saveAsHFile( + rdd: RDD[(ImmutableBytesWritable, Put)], + jobConf: SerializableWritable[JobConf]) { + ??? + } + + override def output = Nil + +} diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala index 7befe2d9d9d17..8aa04ced45e1f 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala @@ -16,7 +16,8 @@ */ package org.apache.spark.sql.hbase.logical -import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Command} +import org.apache.spark.sql.hbase.HBaseRelation case class CreateHBaseTablePlan(tableName: String, nameSpace: String, @@ -27,3 +28,14 @@ case class CreateHBaseTablePlan(tableName: String, ) extends Command case class DropTablePlan(tableName: String) extends Command + + +case class BulkLoadIntoTable(table: HBaseRelation, path: String) extends LeafNode { + override def output = Seq.empty + // TODO:need resolved here? + +} + +case class LoadDataIntoTable(path: String, table: String, isLocal: Boolean) extends LeafNode { + override def output = Seq.empty +} diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala new file mode 100644 index 0000000000000..54f16939d41e1 --- /dev/null +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.scalatest.FunSuite +import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext, Logging} + +class HBasePartitionerSuite extends FunSuite with LocalSparkContext with Logging { + + val conf = new SparkConf(loadDefaults = false) + + test("test hbase partitioner") { + sc = new SparkContext("local", "test") + val pairs = sc.parallelize((1 to 40).map(x => (x, x)), 4) + val groups = pairs.groupByKey(4).collect() + } + +} From 2a4336aa201dca30652651480e2e50f4b9dca5bb Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Mon, 27 Oct 2014 17:03:11 -0700 Subject: [PATCH 02/10] adding impl and test for parser for bulkload --- .../spark/sql/hbase/HBaseSQLParser.scala | 22 +++++++++++++++++-- .../sql/hbase/logical/HBaseLogicalPlans.scala | 18 +++++++++++---- .../sql/hbase/HBasePartitionerSuite.scala | 2 +- 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 4025296f05e34..e518c6d1c4dd2 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -20,9 +20,15 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser} import org.apache.spark.sql.catalyst.SparkSQLParser -import org.apache.spark.sql.hbase.logical.{CreateHBaseTablePlan, DropTablePlan} +import org.apache.spark.sql.hbase.logical.{CreateHBaseTablePlan, DropTablePlan, LoadDataIntoTablePlan} class HBaseSQLParser extends SqlParser { + + protected val DATA = Keyword("DATA") + protected val LOAD = Keyword("LOAD") + protected val LOCAL = Keyword("LOCAL") + protected val INPATH = Keyword("INPATH") + protected val BULK = Keyword("BULK") protected val CREATE = Keyword("CREATE") protected val DROP = Keyword("DROP") @@ -55,7 +61,7 @@ class HBaseSQLParser extends SqlParser { | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} ) - | insert | create | drop | alter + | insert | create | drop | alter | load ) override protected lazy val insert: Parser[LogicalPlan] = @@ -137,6 +143,18 @@ class HBaseSQLParser extends SqlParser { case tn ~ op ~ tc ~ cf => null } + protected lazy val load: Parser[LogicalPlan] = + ( + (LOAD ~> DATA ~> INPATH ~> stringLit) ~ + (opt(OVERWRITE) ~> INTO ~> TABLE ~> relation) ^^ { + case filePath ~ table => LoadDataIntoTablePlan(filePath, table, false) + } + | (LOAD ~> DATA ~> LOCAL ~> INPATH ~> stringLit) ~ + (opt(OVERWRITE) ~> INTO ~> TABLE ~> relation) ^^ { + case filePath ~ table => LoadDataIntoTablePlan(filePath, table, true) + } + ) + protected lazy val tableCol: Parser[(String, String)] = ident ~ (STRING | BYTE | SHORT | INTEGER | LONG | FLOAT | DOUBLE | BOOLEAN) ^^ { case e1 ~ e2 => (e1, e2) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala index 8aa04ced45e1f..a741be11bc73c 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.hbase.logical -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Command} +import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, LeafNode, LogicalPlan, Command} import org.apache.spark.sql.hbase.HBaseRelation case class CreateHBaseTablePlan(tableName: String, @@ -29,13 +29,23 @@ case class CreateHBaseTablePlan(tableName: String, case class DropTablePlan(tableName: String) extends Command - case class BulkLoadIntoTable(table: HBaseRelation, path: String) extends LeafNode { override def output = Seq.empty // TODO:need resolved here? } -case class LoadDataIntoTable(path: String, table: String, isLocal: Boolean) extends LeafNode { - override def output = Seq.empty +/** + * Logical plan for Bulkload + * @param path input data file path + * @param child target relation + * @param isLocal using HDFS or local file + */ +case class LoadDataIntoTablePlan(path: String, + child: LogicalPlan, + isLocal: Boolean) extends UnaryNode { + + override def output = Nil + + override def toString = s"LogicalPlan: LoadDataIntoTable(LOAD $path INTO $child)" } diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala index 54f16939d41e1..fcdde0b809576 100644 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hbase import org.scalatest.FunSuite import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext, Logging} - +import org.apache.spark.SparkContext._ class HBasePartitionerSuite extends FunSuite with LocalSparkContext with Logging { val conf = new SparkConf(loadDefaults = false) From 460bd68ef1b2315005734c06848fcf4a41d28e67 Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Mon, 27 Oct 2014 17:03:28 -0700 Subject: [PATCH 03/10] adding impl and test for parser for bulkload --- .../sql/hbase/HBaseBulkloadParserTest.scala | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala new file mode 100644 index 0000000000000..a2b24e0b7af31 --- /dev/null +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.hbase.logical.LoadDataIntoTablePlan +import org.scalatest.{FunSuite, Matchers} + +/** + * Test Suite for HBase bulkload feature + * Created by jackylk on 2014/10/25. + */ + +class HBaseBulkloadParserTest extends FunSuite with Matchers { + + // Test if we can parse 'LOAD DATA LOCAL INPATH './usr/file.csv' INTO TABLE tb' + test("bulkload parser test, local file") { + + val parser = new HBaseSQLParser() + val sql = raw"LOAD DATA LOCAL INPATH './usr/file.csv' INTO TABLE tb" + //val sql = "select" + + val plan: LogicalPlan = parser(sql) + assert(plan != null) + assert(plan.isInstanceOf[LoadDataIntoTablePlan]) + + val l = plan.asInstanceOf[LoadDataIntoTablePlan] + assert(l.path.equals(raw"./usr/file.csv")) + assert(l.isLocal) + + assert(plan.children(0).isInstanceOf[UnresolvedRelation]) + val r = plan.children(0).asInstanceOf[UnresolvedRelation] + assert(r.tableName.equals("tb")) + } + + // Test if we can parse 'LOAD DATA INPATH '/usr/hdfsfile.csv' INTO TABLE tb' + test("bulkload parser test, load hdfs file") { + + val parser = new HBaseSQLParser() + val sql = raw"LOAD DATA INPATH '/usr/hdfsfile.csv' INTO TABLE tb" + //val sql = "select" + + val plan: LogicalPlan = parser(sql) + assert(plan != null) + assert(plan.isInstanceOf[LoadDataIntoTablePlan]) + + val l = plan.asInstanceOf[LoadDataIntoTablePlan] + assert(l.path.equals(raw"/usr/hdfsfile.csv")) + assert(!l.isLocal) + assert(plan.children(0).isInstanceOf[UnresolvedRelation]) + val r = plan.children(0).asInstanceOf[UnresolvedRelation] + assert(r.tableName.equals("tb")) + } +} From d6c25dc3eac1f63eaf3d1ef7cfd1e9a60c6ff79a Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Tue, 28 Oct 2014 01:07:33 -0700 Subject: [PATCH 04/10] update strategy to use the newly added bulkload logical plan --- .../scala/org/apache/spark/sql/hbase/HBaseStrategies.scala | 2 +- .../apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala index 68301923e258f..4d4d4ca22f83a 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala @@ -95,7 +95,7 @@ private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] { tableName, nameSpace, hbaseTableName, colsSeq, keyCols, nonKeyCols) (hbaseSQLContext)) - case logical.BulkLoadIntoTable(table: HBaseRelation, path) => + case logical.LoadDataIntoTable(path, table: HBaseRelation, isLocal) => execution.BulkLoadIntoTable(table, path)(hbaseSQLContext) :: Nil case InsertIntoTable(table: HBaseRelation, partition, child, _) => new InsertIntoHBaseTable(table, planLater(child))(hbaseSQLContext) :: Nil diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala index 8aa04ced45e1f..4100eb9a8940a 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala @@ -29,13 +29,6 @@ case class CreateHBaseTablePlan(tableName: String, case class DropTablePlan(tableName: String) extends Command - -case class BulkLoadIntoTable(table: HBaseRelation, path: String) extends LeafNode { - override def output = Seq.empty - // TODO:need resolved here? - -} - case class LoadDataIntoTable(path: String, table: String, isLocal: Boolean) extends LeafNode { override def output = Seq.empty } From 2a150c7bd20b2bdf7fa00c885059b3aea121869b Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Tue, 28 Oct 2014 01:17:31 -0700 Subject: [PATCH 05/10] preserving isLocal for checking in execution --- .../main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala | 2 +- .../org/apache/spark/sql/hbase/execution/HBaseOperators.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala index 4d4d4ca22f83a..3b64b94ebef9f 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala @@ -96,7 +96,7 @@ private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] { colsSeq, keyCols, nonKeyCols) (hbaseSQLContext)) case logical.LoadDataIntoTable(path, table: HBaseRelation, isLocal) => - execution.BulkLoadIntoTable(table, path)(hbaseSQLContext) :: Nil + execution.BulkLoadIntoTable(path, table, isLocal)(hbaseSQLContext) :: Nil case InsertIntoTable(table: HBaseRelation, partition, child, _) => new InsertIntoHBaseTable(table, planLater(child))(hbaseSQLContext) :: Nil case logical.DropTablePlan(tableName) => Seq(execution.DropHbaseTableCommand(tableName)(hbaseSQLContext)) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala index a9a4a5f264939..26d704f15ef48 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala @@ -74,7 +74,7 @@ case class InsertIntoHBaseTable( } @DeveloperApi -case class BulkLoadIntoTable(relation: HBaseRelation, path: String)( +case class BulkLoadIntoTable(path: String, relation: HBaseRelation, isLocal: Boolean)( @transient hbContext: HBaseSQLContext) extends LeafNode { val jobConf = new JobConf(hbContext.sc.hadoopConfiguration) From 58c3605dc9590836df782819e45f81dcbc466e83 Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Tue, 28 Oct 2014 13:33:43 -0700 Subject: [PATCH 06/10] Rename HBaseLogicalPlans.scala to hbaseOperators.scala --- .../logical/{HBaseLogicalPlans.scala => hbaseOperators.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/{HBaseLogicalPlans.scala => hbaseOperators.scala} (100%) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala similarity index 100% rename from sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala rename to sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala From a98515862c36b0b836bee682c884036cee38c686 Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Tue, 28 Oct 2014 13:34:36 -0700 Subject: [PATCH 07/10] Update hbaseOperators.scala --- .../org/apache/spark/sql/hbase/logical/hbaseOperators.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala index a741be11bc73c..bf6f0d59b49a8 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala @@ -29,12 +29,6 @@ case class CreateHBaseTablePlan(tableName: String, case class DropTablePlan(tableName: String) extends Command -case class BulkLoadIntoTable(table: HBaseRelation, path: String) extends LeafNode { - override def output = Seq.empty - // TODO:need resolved here? - -} - /** * Logical plan for Bulkload * @param path input data file path From b3b41f350d6dce9c6746cc36e680e7d6b24a0688 Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 27 Oct 2014 17:01:03 -0700 Subject: [PATCH 08/10] test case for hbase partitioner --- .../hbase/SparkImmutableBytesWritable.java | 259 ++++++++++++++++++ .../spark/sql/hbase/HBasePartitioner.scala | 15 +- .../org/apache/spark/sql/hbase/package.scala | 2 + .../sql/hbase/HBasePartitionerSuite.scala | 28 +- .../org/apache/spark/sql/hbase/TestData.scala | 5 +- 5 files changed, 296 insertions(+), 13 deletions(-) create mode 100644 sql/hbase/src/main/java/org/apache/spark/sql/hbase/SparkImmutableBytesWritable.java diff --git a/sql/hbase/src/main/java/org/apache/spark/sql/hbase/SparkImmutableBytesWritable.java b/sql/hbase/src/main/java/org/apache/spark/sql/hbase/SparkImmutableBytesWritable.java new file mode 100644 index 0000000000000..59691e6f7fadf --- /dev/null +++ b/sql/hbase/src/main/java/org/apache/spark/sql/hbase/SparkImmutableBytesWritable.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; +import java.util.Arrays; +import java.util.List; +import scala.Serializable; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +/** + * copy from hbase for Serializable issue + */ +public class SparkImmutableBytesWritable + implements WritableComparable, Serializable { + private byte[] bytes; + private int offset; + private int length; + + /** + * Create a zero-size sequence. + */ + public SparkImmutableBytesWritable() { + super(); + } + + /** + * Create a ImmutableBytesWritable using the byte array as the initial value. + * @param bytes This array becomes the backing storage for the object. + */ + public SparkImmutableBytesWritable(byte[] bytes) { + this(bytes, 0, bytes.length); + } + + /** + * Set the new ImmutableBytesWritable to the contents of the passed + * ibw. + * @param ibw the value to set this ImmutableBytesWritable to. + */ + public SparkImmutableBytesWritable(final SparkImmutableBytesWritable ibw) { + this(ibw.get(), ibw.getOffset(), ibw.getLength()); + } + + /** + * Set the value to a given byte range + * @param bytes the new byte range to set to + * @param offset the offset in newData to start at + * @param length the number of bytes in the range + */ + public SparkImmutableBytesWritable(final byte[] bytes, final int offset, + final int length) { + this.bytes = bytes; + this.offset = offset; + this.length = length; + } + + /** + * Get the data from the BytesWritable. + * @return The data is only valid between offset and offset+length. + */ + public byte [] get() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.bytes; + } + + /** + * @param b Use passed bytes as backing array for this instance. + */ + public void set(final byte [] b) { + set(b, 0, b.length); + } + + /** + * @param b Use passed bytes as backing array for this instance. + * @param offset + * @param length + */ + public void set(final byte [] b, final int offset, final int length) { + this.bytes = b; + this.offset = offset; + this.length = length; + } + + /** + * @return the number of valid bytes in the buffer + * @deprecated use {@link #getLength()} instead + */ + @Deprecated + public int getSize() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.length; + } + + /** + * @return the number of valid bytes in the buffer + */ + public int getLength() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.length; + } + + /** + * @return offset + */ + public int getOffset(){ + return this.offset; + } + + public void readFields(final DataInput in) throws IOException { + this.length = in.readInt(); + this.bytes = new byte[this.length]; + in.readFully(this.bytes, 0, this.length); + this.offset = 0; + } + + public void write(final DataOutput out) throws IOException { + out.writeInt(this.length); + out.write(this.bytes, this.offset, this.length); + } + + // Below methods copied from BytesWritable + @Override + public int hashCode() { + int hash = 1; + for (int i = offset; i < offset + length; i++) + hash = (31 * hash) + (int)bytes[i]; + return hash; + } + + /** + * Define the sort order of the BytesWritable. + * @param that The other bytes writable + * @return Positive if left is bigger than right, 0 if they are equal, and + * negative if left is smaller than right. + */ + public int compareTo(SparkImmutableBytesWritable that) { + return WritableComparator.compareBytes( + this.bytes, this.offset, this.length, + that.bytes, that.offset, that.length); + } + + /** + * Compares the bytes in this object to the specified byte array + * @param that + * @return Positive if left is bigger than right, 0 if they are equal, and + * negative if left is smaller than right. + */ + public int compareTo(final byte [] that) { + return WritableComparator.compareBytes( + this.bytes, this.offset, this.length, + that, 0, that.length); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object right_obj) { + if (right_obj instanceof byte []) { + return compareTo((byte [])right_obj) == 0; + } + if (right_obj instanceof SparkImmutableBytesWritable) { + return compareTo((SparkImmutableBytesWritable)right_obj) == 0; + } + return false; + } + + /** + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(3*this.length); + final int endIdx = this.offset + this.length; + for (int idx = this.offset; idx < endIdx ; idx++) { + sb.append(' '); + String num = Integer.toHexString(0xff & this.bytes[idx]); + // if it is only one digit, add a leading 0. + if (num.length() < 2) { + sb.append('0'); + } + sb.append(num); + } + return sb.length() > 0 ? sb.substring(1) : ""; + } + + /** A Comparator optimized for ImmutableBytesWritable. + */ + public static class Comparator extends WritableComparator { + private BytesWritable.Comparator comparator = + new BytesWritable.Comparator(); + + /** constructor */ + public Comparator() { + super(SparkImmutableBytesWritable.class); + } + + /** + * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int) + */ + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return comparator.compare(b1, s1, l1, b2, s2, l2); + } + } + + static { // register this comparator + WritableComparator.define(SparkImmutableBytesWritable.class, new Comparator()); + } + + /** + * @param array List of byte []. + * @return Array of byte []. + */ + public static byte [][] toArray(final List array) { + // List#toArray doesn't work on lists of byte []. + byte[][] results = new byte[array.size()][]; + for (int i = 0; i < array.size(); i++) { + results[i] = array.get(i); + } + return results; + } + + /** + * Returns a copy of the bytes referred to by this writable + */ + public byte[] copyBytes() { + return Arrays.copyOfRange(bytes, offset, offset+length); + } +} diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala index e833cbfa58464..ed5799f8f526e 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala @@ -27,7 +27,6 @@ import org.apache.spark.SparkEnv import org.apache.spark.Partitioner import org.apache.spark.util.{Utils, CollectionsUtils} import org.apache.spark.serializer.JavaSerializer -import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.HTable class HBasePartitioner [K : Ordering : ClassTag, V]( @@ -121,24 +120,24 @@ class HBasePartitioner [K : Ordering : ClassTag, V]( //Todo: test this object HBasePartitioner { - implicit def orderingRowKey[ImmutableBytesWritable]: Ordering[ImmutableBytesWritable] = - OrderingRowKey.asInstanceOf[Ordering[ImmutableBytesWritable]] + implicit def orderingRowKey[SparkImmutableBytesWritable]: Ordering[SparkImmutableBytesWritable] = + OrderingRowKey.asInstanceOf[Ordering[SparkImmutableBytesWritable]] /** * Return the start keys of all of the regions in this table, - * as a list of ImmutableBytesWritable. + * as a list of SparkImmutableBytesWritable. * Todo: should move to hbase relation after code clean */ def getRegionStartKeys(table: HTable) = { val byteKeys: Array[Array[Byte]] = table.getStartKeys - val ret = ArrayBuffer[ImmutableBytesWritable]() + val ret = ArrayBuffer[SparkImmutableBytesWritable]() for (byteKey <- byteKeys) { - ret += new ImmutableBytesWritable(byteKey) + ret += new SparkImmutableBytesWritable(byteKey) } ret } } -object OrderingRowKey extends Ordering[ImmutableBytesWritable] { - def compare(a: ImmutableBytesWritable, b: ImmutableBytesWritable) = a.compareTo(b) +object OrderingRowKey extends Ordering[SparkImmutableBytesWritable] { + def compare(a: SparkImmutableBytesWritable, b: SparkImmutableBytesWritable) = a.compareTo(b) } \ No newline at end of file diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala index 303723888dd5c..35bdd071dbd6e 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql +import org.apache.hadoop.hbase.io.ImmutableBytesWritable + package object hbase { type HBaseRawType = Array[Byte] } diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala index 54f16939d41e1..a16569bb621b8 100644 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hbase import org.scalatest.FunSuite import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext, Logging} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.rdd.ShuffledRDD class HBasePartitionerSuite extends FunSuite with LocalSparkContext with Logging { @@ -26,8 +28,28 @@ class HBasePartitionerSuite extends FunSuite with LocalSparkContext with Logging test("test hbase partitioner") { sc = new SparkContext("local", "test") - val pairs = sc.parallelize((1 to 40).map(x => (x, x)), 4) - val groups = pairs.groupByKey(4).collect() - } + val data = (1 to 40).map { r => + val rowKey = Bytes.toBytes(r) + val rowKeyWritable = new SparkImmutableBytesWritable(rowKey) + (rowKeyWritable, r) + } + val rdd = sc.parallelize(data, 4) + val splitKeys = (1 to 40).filter(_ % 5 == 0).filter(_ != 40).map { r => + new SparkImmutableBytesWritable(Bytes.toBytes(r)) + } + val partitioner = new HBasePartitioner(rdd)(splitKeys.toArray) + val ordering = HBasePartitioner.orderingRowKey + .asInstanceOf[Ordering[SparkImmutableBytesWritable]] + val shuffled = + new ShuffledRDD[SparkImmutableBytesWritable, Int, Int](rdd, partitioner).setKeyOrdering(ordering) + val groups = shuffled.mapPartitionsWithIndex { (idx, iter) => + iter.map(x => (x._2, idx)) + }.collect() + assert(groups.size == 40) + assert(groups.map(_._2).toSet.size == 8) + groups.foreach { r => + assert(r._1 > 5 * r._2 && r._1 <= 5 * (1 + r._2)) + } + } } diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/TestData.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/TestData.scala index 89f537310aebf..919a0e4cad3b3 100644 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/TestData.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/TestData.scala @@ -21,6 +21,7 @@ import java.sql.Timestamp import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.test._ +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation /* Implicits */ import org.apache.spark.sql.test.TestSQLContext._ @@ -56,11 +57,11 @@ object TestData { // TODO: There is no way to express null primitives as case classes currently... val testData3 = - logical.LocalRelation('a.int, 'b.int).loadData( + LocalRelation('a.int, 'b.int).loadData( (1, null) :: (2, 2) :: Nil) - val emptyTableData = logical.LocalRelation('a.int, 'b.int) + val emptyTableData = LocalRelation('a.int, 'b.int) case class UpperCaseData(N: Int, L: String) val upperCaseData = From 9a826decc1c74837cc8d1f069f03d53500501eae Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 28 Oct 2014 18:22:38 -0700 Subject: [PATCH 09/10] execution plan --- .../spark/sql/hbase/DataTypeUtils.scala | 23 +- .../apache/spark/sql/hbase/HBaseCatalog.scala | 37 +- .../spark/sql/hbase/HBasePartitioner.scala | 14 - .../spark/sql/hbase/HBaseRelation.scala | 369 ++++++----- .../spark/sql/hbase/HBaseSQLContext.scala | 5 +- .../spark/sql/hbase/HBaseSQLReaderRDD.scala | 11 +- .../spark/sql/hbase/HBaseStrategies.scala | 5 +- .../apache/spark/sql/hbase/HadoopReader.scala | 2 +- .../sql/hbase/execution/HBaseCommands.scala | 32 +- .../sql/hbase/execution/HBaseOperators.scala | 60 +- .../sql/hbase/logical/hbaseOperators.scala | 3 +- .../sql/hbase/BulkLoadIntoTableSuite.scala | 53 ++ .../apache/spark/sql/hbase/CatalogTest.scala | 32 +- .../sql/hbase/HBaseBasicOperationSuite.scala | 1 - .../spark/sql/hbase/HBaseMainTest.scala | 600 ++++++++---------- .../spark/sql/hbase/RowKeyParserSuite.scala | 213 ++++--- 16 files changed, 776 insertions(+), 684 deletions(-) create mode 100644 sql/hbase/src/test/scala/org/apache/spark/sql/hbase/BulkLoadIntoTableSuite.scala mode change 100644 => 100755 sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/DataTypeUtils.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/DataTypeUtils.scala index cd7eece7e0c05..75239f3eaf9d4 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/DataTypeUtils.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/DataTypeUtils.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.hbase import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.sql.catalyst.expressions.{MutableRow, Row} import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.catalyst.expressions.MutableRow /** -* Data Type conversion utilities -* -*/ + * Data Type conversion utilities + * + */ object DataTypeUtils { def setRowColumnFromHBaseRawType(row: MutableRow, index: Int, src: HBaseRawType, dt: DataType): Any = { @@ -39,4 +39,19 @@ object DataTypeUtils { case _ => throw new Exception("Unsupported HBase SQL Data Type") } } + + def getRowColumnFromHBaseRawType(row: Row, index: Int, + dt: DataType): HBaseRawType = { + dt match { + case StringType => Bytes.toBytes(row.getString(index)) + case IntegerType => Bytes.toBytes(row.getInt(index)) + case BooleanType => Bytes.toBytes(row.getBoolean(index)) + case ByteType => Bytes.toBytes(row.getByte(index)) + case DoubleType => Bytes.toBytes(row.getDouble(index)) + case FloatType => Bytes.toBytes(row.getFloat(index)) + case LongType => Bytes.toBytes(row.getLong(index)) + case ShortType => Bytes.toBytes(row.getShort(index)) + case _ => throw new Exception("Unsupported HBase SQL Data Type") + } + } } diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseCatalog.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseCatalog.scala index d69e31ca6b8f1..6d49418424d74 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseCatalog.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseCatalog.scala @@ -43,10 +43,11 @@ sealed abstract class AbstractColumn { } } -case class KeyColumn(sqlName: String, dataType: DataType) extends AbstractColumn +case class KeyColumn(val sqlName: String, val dataType: DataType, val order: Int) + extends AbstractColumn -case class NonKeyColumn(sqlName: String, dataType: DataType, family: String, qualifier: String) - extends AbstractColumn { +case class NonKeyColumn(val sqlName: String, val dataType: DataType, + val family: String, val qualifier: String) extends AbstractColumn { @transient lazy val familyRaw = Bytes.toBytes(family) @transient lazy val qualifierRaw = Bytes.toBytes(qualifier) @@ -71,8 +72,7 @@ private[hbase] class HBaseCatalog(@transient hbaseContext: HBaseSQLContext) } def createTable(tableName: String, hbaseNamespace: String, hbaseTableName: String, - allColumns: Seq[KeyColumn], keyColumns: Seq[KeyColumn], - nonKeyColumns: Seq[NonKeyColumn]): Unit = { + allColumns: Seq[AbstractColumn]): Unit = { if (checkLogicalTableExist(tableName)) { throw new Exception(s"The logical table: $tableName already exists") } @@ -81,6 +81,8 @@ private[hbase] class HBaseCatalog(@transient hbaseContext: HBaseSQLContext) throw new Exception(s"The HBase table $hbaseTableName doesn't exist") } + val nonKeyColumns = allColumns.filter(_.isInstanceOf[NonKeyColumn]) + .asInstanceOf[Seq[NonKeyColumn]] nonKeyColumns.foreach { case NonKeyColumn(_, _, family, _) => if (!checkFamilyExists(hbaseTableName, family)) { @@ -149,18 +151,19 @@ private[hbase] class HBaseCatalog(@transient hbaseContext: HBaseSQLContext) put.add(ColumnFamily, QualHbaseName, Bytes.toBytes(result.toString)) */ - val hbaseRelation = HBaseRelation(configuration, hbaseContext, connection, - tableName, hbaseNamespace, hbaseTableName, allColumns, keyColumns, nonKeyColumns) + val hbaseRelation = HBaseRelation(Some(configuration), tableName + , hbaseNamespace, hbaseTableName, allColumns) - val bufout = new ByteArrayOutputStream() - val obout = new ObjectOutputStream(bufout) - obout.writeObject(hbaseRelation) + val byteArrayOutputStream = new ByteArrayOutputStream() + val objectOutputStream = new ObjectOutputStream(byteArrayOutputStream) + objectOutputStream.writeObject(hbaseRelation) - put.add(ColumnFamily, QualData, bufout.toByteArray) + put.add(ColumnFamily, QualData, byteArrayOutputStream.toByteArray) // write to the metadata table table.put(put) table.flushCommits() + table.close() relationMapCache.put(processTableName(tableName), hbaseRelation) } @@ -173,6 +176,7 @@ private[hbase] class HBaseCatalog(@transient hbaseContext: HBaseSQLContext) val get = new Get(Bytes.toBytes(tableName)) val values = table.get(get) + table.close() if (values == null) { result = None } else { @@ -232,14 +236,14 @@ private[hbase] class HBaseCatalog(@transient hbaseContext: HBaseSQLContext) } */ val value = values.getValue(ColumnFamily, QualData) - val bufferInput = new ByteArrayInputStream(value) - val objectInput = new ObjectInputStream(bufferInput) - val relation = objectInput.readObject().asInstanceOf[HBaseRelation]: HBaseRelation + val byteArrayInputStream = new ByteArrayInputStream(value) + val objectInputStream = new ObjectInputStream(byteArrayInputStream) + val relation = objectInputStream.readObject().asInstanceOf[HBaseRelation]: HBaseRelation val hbaseRelation = HBaseRelation( - configuration, hbaseContext, connection, + Some(configuration), relation.tableName, relation.hbaseNamespace, relation.hbaseTableName, - relation.allColumns, relation.keyColumns, relation.nonKeyColumns) + relation.allColumns) relationMapCache.put(processTableName(tableName), hbaseRelation) result = Some(hbaseRelation) } @@ -266,7 +270,6 @@ private[hbase] class HBaseCatalog(@transient hbaseContext: HBaseSQLContext) val delete = new Delete((Bytes.toBytes(tableName))) table.delete(delete) - table.close() relationMapCache.remove(processTableName(tableName)) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala index ed5799f8f526e..1a92f321000a2 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala @@ -122,20 +122,6 @@ class HBasePartitioner [K : Ordering : ClassTag, V]( object HBasePartitioner { implicit def orderingRowKey[SparkImmutableBytesWritable]: Ordering[SparkImmutableBytesWritable] = OrderingRowKey.asInstanceOf[Ordering[SparkImmutableBytesWritable]] - - /** - * Return the start keys of all of the regions in this table, - * as a list of SparkImmutableBytesWritable. - * Todo: should move to hbase relation after code clean - */ - def getRegionStartKeys(table: HTable) = { - val byteKeys: Array[Array[Byte]] = table.getStartKeys - val ret = ArrayBuffer[SparkImmutableBytesWritable]() - for (byteKey <- byteKeys) { - ret += new SparkImmutableBytesWritable(byteKey) - } - ret - } } object OrderingRowKey extends Ordering[SparkImmutableBytesWritable] { diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala index 83d26a72e8099..6ded72a256dc5 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.hbase import java.util.ArrayList -import java.util.concurrent.atomic.{AtomicInteger} +import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter.{FilterList, Filter} -import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.{HBaseConfiguration, TableName, HRegionInfo, ServerName} import org.apache.log4j.Logger import org.apache.spark.Partition import org.apache.spark.sql.catalyst.expressions.{Row, MutableRow, _} @@ -32,37 +32,39 @@ import org.apache.spark.sql.catalyst.types._ import scala.collection.SortedMap import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer -private[hbase] case class HBaseRelation( - @transient configuration: Configuration, - @transient hbaseContext: HBaseSQLContext, - @transient connection: HConnection, +private[hbase] case class HBaseRelation( @transient configuration: Option[Configuration], tableName: String, hbaseNamespace: String, hbaseTableName: String, - allColumns: Seq[AbstractColumn], - keyColumns: Seq[KeyColumn], - nonKeyColumns: Seq[NonKeyColumn] - ) + allColumns: Seq[AbstractColumn]) extends LeafNode { self: Product => - @transient lazy val handle: HTable = new HTable(configuration, hbaseTableName) + @transient lazy val htable: HTable = new HTable(configuration_, hbaseTableName) @transient lazy val logger = Logger.getLogger(getClass.getName) + @transient lazy val keyColumns = allColumns.filter(_.isInstanceOf[KeyColumn]) + .asInstanceOf[Seq[KeyColumn]].sortBy(_.order) + @transient lazy val nonKeyColumns = allColumns.filter(_.isInstanceOf[NonKeyColumn]) + .asInstanceOf[Seq[NonKeyColumn]] @transient lazy val partitionKeys = keyColumns.map(col => AttributeReference(col.sqlName, col.dataType, nullable = false)()) @transient lazy val columnMap = allColumns.map { - case key: KeyColumn => (key.sqlName, keyColumns.indexOf(key)) + case key: KeyColumn => (key.sqlName, key.order) case nonKey: NonKeyColumn => (nonKey.sqlName, nonKey) }.toMap + @transient private lazy val configuration_ = if (configuration != null) configuration.getOrElse(HBaseConfiguration.create()) + else HBaseConfiguration.create() + lazy val attributes = nonKeyColumns.map(col => AttributeReference(col.sqlName, col.dataType, nullable = true)()) // lazy val colFamilies = nonKeyColumns.map(_.family).distinct // lazy val applyFilters = false - def closeHTable() = handle.close + def closeHTable() = htable.close override def output: Seq[Attribute] = { allColumns.map { @@ -71,17 +73,13 @@ private[hbase] case class HBaseRelation( } } - //TODO-XY:ADD getPrunedPartitions lazy val partitions: Seq[HBasePartition] = { - val tableNameInSpecialClass = TableName.valueOf(hbaseNamespace, tableName) - val regionLocations = connection.locateRegions(tableNameInSpecialClass) - regionLocations.asScala - .zipWithIndex.map { case (hregionLocation, index) => - val regionInfo = hregionLocation.getRegionInfo - new HBasePartition(index, Some(regionInfo.getStartKey), - Some(regionInfo.getEndKey), - Some(hregionLocation.getServerName.getHostname)) - } + val regionLocations = htable.getRegionLocations.asScala.toSeq + regionLocations.zipWithIndex.map(p => + new HBasePartition(p._2, Some(p._1._1.getStartKey), + Some(p._1._1.getEndKey), + Some(p._1._2.getHostname)) + ) } def getPrunedPartitions(partionPred: Option[Expression] = None): Option[Seq[HBasePartition]] = { @@ -89,6 +87,20 @@ private[hbase] case class HBaseRelation( Option(partitions) } + + /** + * Return the start keys of all of the regions in this table, + * as a list of SparkImmutableBytesWritable. + */ + def getRegionStartKeys() = { + val byteKeys: Array[Array[Byte]] = htable.getStartKeys + val ret = ArrayBuffer[SparkImmutableBytesWritable]() + for (byteKey <- byteKeys) { + ret += new SparkImmutableBytesWritable(byteKey) + } + ret + } + def buildFilter(projList: Seq[NamedExpression], rowKeyPredicate: Option[Expression], valuePredicate: Option[Expression]) = { @@ -106,18 +118,20 @@ private[hbase] case class HBaseRelation( def buildScan(split: Partition, filters: Option[FilterList], projList: Seq[NamedExpression]): Scan = { val hbPartition = split.asInstanceOf[HBasePartition] - val scan = { - (hbPartition.lowerBound, hbPartition.upperBound) match { - case (Some(lb), Some(ub)) => new Scan(lb, ub) - case (Some(lb), None) => new Scan(lb) - case _ => new Scan - } - } - if (filters.isDefined) { - scan.setFilter(filters.get) - } - // TODO: add add Family to SCAN from projections - scan +// val scan = { +// (hbPartition.lowerBound, hbPartition.upperBound) match { +// case (Some(lb), Some(ub)) => new Scan(lb, ub) +// case (Some(lb), None) => new Scan(lb) +// case _ => new Scan +// } +// } +//// if (filters.isDefined) { +//// scan.setFilter(filters.get) +//// } +// // TODO: add add Family to SCAN from projections +// scan + + new Scan } def buildGet(projList: Seq[NamedExpression], rowKey: HBaseRawType) { @@ -126,134 +140,185 @@ private[hbase] case class HBaseRelation( } /** - * Trait for RowKeyParser's that convert a raw array of bytes into their constituent - * logical column values - * + * create row key based on key columns information + * @param rawKeyColumns sequence of byte array representing the key columns + * @return array of bytes */ - trait AbstractRowKeyParser { - def createKey(rawBytes: Seq[HBaseRawType], version: Byte): HBaseRawType - - def parseRowKey(rowKey: HBaseRawType): Seq[HBaseRawType] - - def parseRowKeyWithMetaData(rkCols: Seq[KeyColumn], rowKey: HBaseRawType) - : SortedMap[TableName, (KeyColumn, Any)] // TODO change Any - } - - case class RowKeySpec(offsets: Seq[Int], version: Byte = RowKeyParser.Version1) - - // TODO(Bo): replace the implementation with the null-byte terminated string logic - object RowKeyParser extends AbstractRowKeyParser with Serializable { - val Version1 = 1.toByte - val VersionFieldLen = 1 - // Length in bytes of the RowKey version field - val DimensionCountLen = 1 - // One byte for the number of key dimensions - val MaxDimensions = 255 - val OffsetFieldLen = 2 - - // Two bytes for the value of each dimension offset. - // Therefore max size of rowkey is 65535. Note: if longer rowkeys desired in future - // then simply define a new RowKey version to support it. Otherwise would be wasteful - // to define as 4 bytes now. - def computeLength(keys: Seq[HBaseRawType]) = { - VersionFieldLen + keys.map(_.length).sum + - OffsetFieldLen * keys.size + DimensionCountLen - } - - override def createKey(keys: Seq[HBaseRawType], version: Byte = Version1): HBaseRawType = { - val barr = new Array[Byte](computeLength(keys)) - val arrayx = new AtomicInteger(0) - barr(arrayx.getAndAdd(VersionFieldLen)) = version // VersionByte - - // Remember the starting offset of first data value - val valuesStartIndex = new AtomicInteger(arrayx.get) - - // copy each of the dimension values in turn - keys.foreach { k => copyToArr(barr, k, arrayx.getAndAdd(k.length))} - - // Copy the offsets of each dim value - // The valuesStartIndex is the location of the first data value and thus the first - // value included in the Offsets sequence - keys.foreach { k => - copyToArr(barr, - short2b(valuesStartIndex.getAndAdd(k.length).toShort), - arrayx.getAndAdd(OffsetFieldLen)) + def encodingRawKeyColumns(rawKeyColumns: Seq[HBaseRawType]): HBaseRawType = { + var buffer = ArrayBuffer[Byte]() + val delimiter: Byte = 0 + var index = 0 + for (rawKeyColumn <- rawKeyColumns) { + val keyColumn = keyColumns(index) + buffer = buffer ++ rawKeyColumn + if (keyColumn.dataType == StringType) { + buffer += delimiter } - barr(arrayx.get) = keys.length.toByte // DimensionCountByte - barr + index = index + 1 } + buffer.toArray + } - def copyToArr[T](a: Array[T], b: Array[T], aoffset: Int) = { - b.copyToArray(a, aoffset) - } - - def short2b(sh: Short): Array[Byte] = { - val barr = Array.ofDim[Byte](2) - barr(0) = ((sh >> 8) & 0xff).toByte - barr(1) = (sh & 0xff).toByte - barr - } - - def b2Short(barr: Array[Byte]) = { - val out = (barr(0).toShort << 8) | barr(1).toShort - out - } - - def createKeyFromCatalystRow(schema: StructType, keyCols: Seq[KeyColumn], row: Row) = { - // val rawKeyCols = DataTypeUtils.catalystRowToHBaseRawVals(schema, row, keyCols) - // createKey(rawKeyCols) - null - } - - def getMinimumRowKeyLength = VersionFieldLen + DimensionCountLen - - override def parseRowKey(rowKey: HBaseRawType): Seq[HBaseRawType] = { - assert(rowKey.length >= getMinimumRowKeyLength, - s"RowKey is invalid format - less than minlen . Actual length=${rowKey.length}") - assert(rowKey(0) == Version1, s"Only Version1 supported. Actual=${rowKey(0)}") - val ndims: Int = rowKey(rowKey.length - 1).toInt - val offsetsStart = rowKey.length - DimensionCountLen - ndims * OffsetFieldLen - val rowKeySpec = RowKeySpec( - for (dx <- 0 to ndims - 1) - yield b2Short(rowKey.slice(offsetsStart + dx * OffsetFieldLen, - offsetsStart + (dx + 1) * OffsetFieldLen)) - ) - - val endOffsets = rowKeySpec.offsets.tail :+ (rowKey.length - DimensionCountLen - 1) - val colsList = rowKeySpec.offsets.zipWithIndex.map { case (off, ix) => - rowKey.slice(off, endOffsets(ix)) + /** + * get the sequence of key columns from the byte array + * @param rowKey array of bytes + * @return sequence of byte array + */ + def decodingRawKeyColumns(rowKey: HBaseRawType): Seq[HBaseRawType] = { + var rowKeyList = List[HBaseRawType]() + val delimiter: Byte = 0 + var index = 0 + for (keyColumn <- keyColumns) { + var buffer = ArrayBuffer[Byte]() + val dataType = keyColumn.dataType + if (dataType == StringType) { + while (index < rowKey.length && rowKey(index) != delimiter) { + buffer += rowKey(index) + index = index + 1 + } + index = index + 1 } - colsList - } - - //TODO - override def parseRowKeyWithMetaData(rkCols: Seq[KeyColumn], rowKey: HBaseRawType): - SortedMap[TableName, (KeyColumn, Any)] = { - import scala.collection.mutable.HashMap - - // val rowKeyVals = parseRowKey(rowKey) - // val rmap = rowKeyVals.zipWithIndex.foldLeft(new HashMap[ColumnName, (Column, Any)]()) { - // case (m, (cval, ix)) => - // m.update(rkCols(ix).toColumnName, (rkCols(ix), - // hbaseFieldToRowField(cval, rkCols(ix).dataType))) - // m - // } - // TreeMap(rmap.toArray: _*)(Ordering.by { cn: ColumnName => rmap(cn)._1.ordinal}) - // .asInstanceOf[SortedMap[ColumnName, (Column, Any)]] - null - } - - def show(bytes: Array[Byte]) = { - val len = bytes.length - // val out = s"Version=${bytes(0).toInt} NumDims=${bytes(len - 1)} " + else { + val length = NativeType.defaultSizeOf(dataType.asInstanceOf[NativeType]) + for (i <- 0 to (length - 1)) { + buffer += rowKey(index) + index = index + 1 + } + } + rowKeyList = rowKeyList :+ buffer.toArray } - + rowKeyList } + // /** + // * Trait for RowKeyParser's that convert a raw array of bytes into their constituent + // * logical column values + // * + // */ + // trait AbstractRowKeyParser { + // + //// def createKey(rawBytes: Seq[HBaseRawType], version: Byte): HBaseRawType + //// + //// def parseRowKey(rowKey: HBaseRawType): Seq[HBaseRawType] + //// + //// def parseRowKeyWithMetaData(rkCols: Seq[KeyColumn], rowKey: HBaseRawType) + //// : SortedMap[TableName, (KeyColumn, Any)] // TODO change Any + // } + // + // case class RowKeySpec(offsets: Seq[Int], version: Byte = RowKeyParser.Version1) + // + // // TODO(Bo): replace the implementation with the null-byte terminated string logic + // object RowKeyParser extends AbstractRowKeyParser with Serializable { + // val Version1 = 1.toByte + // val VersionFieldLen = 1 + // // Length in bytes of the RowKey version field + // val DimensionCountLen = 1 + // // One byte for the number of key dimensions + // val MaxDimensions = 255 + // val OffsetFieldLen = 2 + // + // // Two bytes for the value of each dimension offset. + // // Therefore max size of rowkey is 65535. Note: if longer rowkeys desired in future + // // then simply define a new RowKey version to support it. Otherwise would be wasteful + // // to define as 4 bytes now. + // def computeLength(keys: Seq[HBaseRawType]) = { + // VersionFieldLen + keys.map(_.length).sum + + // OffsetFieldLen * keys.size + DimensionCountLen + // } + // + // override def createKey(keys: Seq[HBaseRawType], version: Byte = Version1): HBaseRawType = { + // val barr = new Array[Byte](computeLength(keys)) + // val arrayx = new AtomicInteger(0) + // barr(arrayx.getAndAdd(VersionFieldLen)) = version // VersionByte + // + // // Remember the starting offset of first data value + // val valuesStartIndex = new AtomicInteger(arrayx.get) + // + // // copy each of the dimension values in turn + // keys.foreach { k => copyToArr(barr, k, arrayx.getAndAdd(k.length))} + // + // // Copy the offsets of each dim value + // // The valuesStartIndex is the location of the first data value and thus the first + // // value included in the Offsets sequence + // keys.foreach { k => + // copyToArr(barr, + // short2b(valuesStartIndex.getAndAdd(k.length).toShort), + // arrayx.getAndAdd(OffsetFieldLen)) + // } + // barr(arrayx.get) = keys.length.toByte // DimensionCountByte + // barr + // } + // + // def copyToArr[T](a: Array[T], b: Array[T], aoffset: Int) = { + // b.copyToArray(a, aoffset) + // } + // + // def short2b(sh: Short): Array[Byte] = { + // val barr = Array.ofDim[Byte](2) + // barr(0) = ((sh >> 8) & 0xff).toByte + // barr(1) = (sh & 0xff).toByte + // barr + // } + // + // def b2Short(barr: Array[Byte]) = { + // val out = (barr(0).toShort << 8) | barr(1).toShort + // out + // } + // + // def createKeyFromCatalystRow(schema: StructType, keyCols: Seq[KeyColumn], row: Row) = { + // // val rawKeyCols = DataTypeUtils.catalystRowToHBaseRawVals(schema, row, keyCols) + // // createKey(rawKeyCols) + // null + // } + // + // def getMinimumRowKeyLength = VersionFieldLen + DimensionCountLen + // + // override def parseRowKey(rowKey: HBaseRawType): Seq[HBaseRawType] = { + // assert(rowKey.length >= getMinimumRowKeyLength, + // s"RowKey is invalid format - less than minlen . Actual length=${rowKey.length}") + // assert(rowKey(0) == Version1, s"Only Version1 supported. Actual=${rowKey(0)}") + // val ndims: Int = rowKey(rowKey.length - 1).toInt + // val offsetsStart = rowKey.length - DimensionCountLen - ndims * OffsetFieldLen + // val rowKeySpec = RowKeySpec( + // for (dx <- 0 to ndims - 1) + // yield b2Short(rowKey.slice(offsetsStart + dx * OffsetFieldLen, + // offsetsStart + (dx + 1) * OffsetFieldLen)) + // ) + // + // val endOffsets = rowKeySpec.offsets.tail :+ (rowKey.length - DimensionCountLen - 1) + // val colsList = rowKeySpec.offsets.zipWithIndex.map { case (off, ix) => + // rowKey.slice(off, endOffsets(ix)) + // } + // colsList + // } + // + //// //TODO + //// override def parseRowKeyWithMetaData(rkCols: Seq[KeyColumn], rowKey: HBaseRawType): + //// SortedMap[TableName, (KeyColumn, Any)] = { + //// import scala.collection.mutable.HashMap + //// + //// val rowKeyVals = parseRowKey(rowKey) + //// val rmap = rowKeyVals.zipWithIndex.foldLeft(new HashMap[ColumnName, (Column, Any)]()) { + //// case (m, (cval, ix)) => + //// m.update(rkCols(ix).toColumnName, (rkCols(ix), + //// hbaseFieldToRowField(cval, rkCols(ix).dataType))) + //// m + //// } + //// TreeMap(rmap.toArray: _*)(Ordering.by { cn: ColumnName => rmap(cn)._1.ordinal}) + //// .asInstanceOf[SortedMap[ColumnName, (Column, Any)]] + //// } + // + // def show(bytes: Array[Byte]) = { + // val len = bytes.length + // // val out = s"Version=${bytes(0).toInt} NumDims=${bytes(len - 1)} " + // } + // + // } + def buildRow(projections: Seq[(Attribute, Int)], result: Result, row: MutableRow): Row = { assert(projections.size == row.length, "Projection size and row size mismatched") // TODO: replaced with the new Key method - val rowKeys = RowKeyParser.parseRowKey(result.getRow) + val rowKeys = decodingRawKeyColumns(result.getRow) projections.foreach { p => columnMap.get(p._1.name).get match { case column: NonKeyColumn => { diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLContext.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLContext.scala index 40b04c6a601a5..db857003ceb8f 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLContext.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLContext.scala @@ -17,15 +17,12 @@ package org.apache.spark.sql.hbase -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.io.DataOutputStream -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase._ import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ -import org.apache.spark.sql.hbase.HBaseCatalog._ /** * An instance of the Spark SQL execution engine that integrates with data stored in Hive. diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLReaderRDD.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLReaderRDD.scala index 219a758698213..ce8880dfe179b 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLReaderRDD.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLReaderRDD.scala @@ -53,16 +53,20 @@ class HBaseSQLReaderRDD(relation: HBaseRelation, val filters = relation.buildFilter(output, rowKeyPred, valuePred) val scan = relation.buildScan(split, filters, output) scan.setCaching(cachingSize) - val scanner = relation.handle.getScanner(scan) + val scanner = relation.htable.getScanner(scan) var finished: Boolean = false + var gotNext: Boolean = false var result: Result = null val row = new GenericMutableRow(output.size) val projections = output.zipWithIndex val iter = new Iterator[Row] { override def hasNext: Boolean = { if (!finished) { - result = scanner.next - finished = result == null + if (!gotNext) { + result = scanner.next + finished = result == null + gotNext = true + } } if (finished) { close @@ -72,6 +76,7 @@ class HBaseSQLReaderRDD(relation: HBaseRelation, override def next(): Row = { if (hasNext) { + gotNext = false relation.buildRow(projections, result, row) } else { null diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala index 3b64b94ebef9f..af3d8ad95dc7c 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution._ import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hbase.execution.{HBaseSQLTableScan, InsertIntoHBaseTable} +import org.apache.spark.sql.hbase.execution.{DropHbaseTableCommand, HBaseSQLTableScan, InsertIntoHBaseTable} /** * HBaseStrategies @@ -99,7 +99,8 @@ private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] { execution.BulkLoadIntoTable(path, table, isLocal)(hbaseSQLContext) :: Nil case InsertIntoTable(table: HBaseRelation, partition, child, _) => new InsertIntoHBaseTable(table, planLater(child))(hbaseSQLContext) :: Nil - case logical.DropTablePlan(tableName) => Seq(execution.DropHbaseTableCommand(tableName)(hbaseSQLContext)) + case logical.DropTablePlan(tableName) => + Seq(DropHbaseTableCommand(tableName)(hbaseSQLContext)) case _ => Nil } } diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala index aee77ae485a33..efaa301d98caf 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala @@ -65,7 +65,7 @@ class HadoopReader( // Todo: needs info which field is rowkey and value from HbaseRelation here val fields = value._2.toString.split(splitRegex) val rowKey = Bytes.toBytes(fields(0)) - val rowKeyWritable = new ImmutableBytesWritable(rowKey) + val rowKeyWritable = new SparkImmutableBytesWritable(rowKey) val family = Bytes.toBytes("cf") val qualifier = Bytes.toBytes("count") val hbaseValue = Bytes.toBytes(fields(1)) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseCommands.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseCommands.scala index 2adcd5ee6e610..ea43cd845bc97 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseCommands.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseCommands.scala @@ -33,23 +33,27 @@ case class CreateHBaseTableCommand(tableName: String, override protected[sql] lazy val sideEffectResult = { val catalog = context.catalog - val keyColumns = keyCols.map { case (name, typeOfData) => - KeyColumn(name, catalog.getDataType(typeOfData)) - } - val nonKeyColumns = nonKeyCols.map { - case (name, typeOfData, family, qualifier) => - NonKeyColumn(name, catalog.getDataType(typeOfData), family, qualifier) - } - - val colWithTypeMap = (keyCols union nonKeyCols.map { - case (name, datatype, _, _) => (name, datatype) - }).toMap + val keyMap = keyCols.toMap val allColumns = colsSeq.map { - case name => - KeyColumn(name, catalog.getDataType(colWithTypeMap.get(name).get)) + case name => { + if (keyMap.contains(name)) { + KeyColumn( + name, + catalog.getDataType(keyMap.get(name).get), + keyCols.indexWhere(_._1 == name)) + } else { + val nonKeyCol = nonKeyCols.find(_._1 == name).get + NonKeyColumn( + name, + catalog.getDataType(nonKeyCol._2), + nonKeyCol._3, + nonKeyCol._4 + ) + } + } } - catalog.createTable(tableName, nameSpace, hbaseTable, allColumns, keyColumns, nonKeyColumns) + catalog.createTable(tableName, nameSpace, hbaseTable, allColumns) Seq.empty[Row] } diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala index 26d704f15ef48..97413c6edd569 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala @@ -16,13 +16,22 @@ */ package org.apache.spark.sql.hbase.execution +import java.util.Date +import java.text.SimpleDateFormat + +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{CompatibilitySingletonFactory, HadoopShims} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{ShuffledRDD, RDD} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{LeafNode, UnaryNode, SparkPlan} -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.SparkContext._ +import org.apache.hadoop.mapred.{Reporter, JobConf} import org.apache.hadoop.hbase.client.Put import org.apache.spark.SerializableWritable import org.apache.spark.sql.hbase._ @@ -81,26 +90,53 @@ case class BulkLoadIntoTable(path: String, relation: HBaseRelation, isLocal: Boo val hadoopReader = new HadoopReader(hbContext.sparkContext, jobConf) - // TODO: get from raltion (or config) - val splitKeys = ??? + // atmp path for storing HFile + val tmpPath = "/bulkload/test" override def execute() = { - implicit val ordering = HBasePartitioner.orderingRowKey - .asInstanceOf[Ordering[ImmutableBytesWritable]] + val ordering = HBasePartitioner.orderingRowKey + .asInstanceOf[Ordering[SparkImmutableBytesWritable]] + val splitKeys = relation.getRegionStartKeys() val rdd = hadoopReader.makeBulkLoadRDD val partitioner = new HBasePartitioner(rdd)(splitKeys) val shuffled = - new ShuffledRDD[ImmutableBytesWritable, Put, Put](rdd, partitioner).setKeyOrdering(ordering) + new ShuffledRDD[SparkImmutableBytesWritable, Put, Put](rdd, partitioner).setKeyOrdering(ordering) + + jobConf.setOutputKeyClass(classOf[SparkImmutableBytesWritable]) + jobConf.setOutputValueClass(classOf[Put]) + jobConf.set("mapred.output.format.class", classOf[HFileOutputFormat].getName) + jobConf.set("mapred.output.dir", tmpPath) + shuffled.saveAsHadoopDataset(jobConf) - val jobConfSer = new SerializableWritable(jobConf) - saveAsHFile(shuffled, jobConfSer) - // bulk load is like a command, so return null is ok here null } def saveAsHFile( - rdd: RDD[(ImmutableBytesWritable, Put)], - jobConf: SerializableWritable[JobConf]) { + rdd: RDD[(SparkImmutableBytesWritable, Put)], + jobConf: SerializableWritable[JobConf], + path: String) { + val conf = jobConf.value + val job = new Job(conf) + job.setOutputKeyClass(classOf[SparkImmutableBytesWritable]) + job.setOutputValueClass(classOf[Put]) + job.setOutputFormatClass(classOf[HFileOutputFormat]) + FileOutputFormat.setOutputPath(job, new Path(path)) + + val wrappedConf = new SerializableWritable(job.getConfiguration) + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = sqlContext.sparkContext.newRddId() + + + def getWriter( + outFormat: HFileOutputFormat, + conf: Configuration, + path: Path, + reporter: Reporter) = { + val hadoopShim = CompatibilitySingletonFactory.getInstance(classOf[HadoopShims]) + val context = hadoopShim.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0") + outFormat.getRecordWriter(context) + } ??? } diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala index 378f97c70b318..3bc6b9a72890b 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala @@ -16,8 +16,7 @@ */ package org.apache.spark.sql.hbase.logical -import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, LeafNode, LogicalPlan, Command} -import org.apache.spark.sql.hbase.HBaseRelation +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode, Command} case class CreateHBaseTablePlan(tableName: String, nameSpace: String, diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/BulkLoadIntoTableSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/BulkLoadIntoTableSuite.scala new file mode 100644 index 0000000000000..3522f9a969676 --- /dev/null +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/BulkLoadIntoTableSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.scalatest.FunSuite +import org.apache.spark.{SparkContext, Logging, LocalSparkContext} +import org.apache.spark.rdd.ShuffledRDD +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.hbase.util.Bytes + +class BulkLoadIntoTableSuite extends FunSuite with LocalSparkContext with Logging{ + + test("write data to HFile") { + sc = new SparkContext("local", "test") + val jobConf = new JobConf(sc.hadoopConfiguration) + val hadoopReader = new HadoopReader(sc, jobConf) + // atmp path for storing HFile + val tmpPath = "/bulkload/test" + val ordering = HBasePartitioner.orderingRowKey + .asInstanceOf[Ordering[SparkImmutableBytesWritable]] + val splitKeys = (1 to 40).filter(_ % 5 == 0).filter(_ != 40).map { r => + new SparkImmutableBytesWritable(Bytes.toBytes(r)) + } + val rdd = hadoopReader.makeBulkLoadRDD + val partitioner = new HBasePartitioner(rdd)(splitKeys) + val shuffled = + new ShuffledRDD[SparkImmutableBytesWritable, Put, Put](rdd, partitioner).setKeyOrdering(ordering) + + jobConf.setOutputKeyClass(classOf[SparkImmutableBytesWritable]) + jobConf.setOutputValueClass(classOf[Put]) + jobConf.set("mapred.output.format.class", classOf[HFileOutputFormat].getName) + jobConf.set("mapred.output.dir", tmpPath) + shuffled.saveAsHadoopDataset(jobConf) + } + +} diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/CatalogTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/CatalogTest.scala index cb18da1c5e849..82a124f10b844 100644 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/CatalogTest.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/CatalogTest.scala @@ -58,25 +58,13 @@ class CatalogTest extends FunSuite with BeforeAndAfterAll with Logging { admin.createTable(desc) } - var allColumns = List[KeyColumn]() - allColumns = allColumns :+ KeyColumn("column2", IntegerType) - allColumns = allColumns :+ KeyColumn("column1", StringType) - allColumns = allColumns :+ KeyColumn("column4", FloatType) - allColumns = allColumns :+ KeyColumn("column3", BooleanType) + var allColumns = List[AbstractColumn]() + allColumns = allColumns :+ KeyColumn("column2", IntegerType, 1) + allColumns = allColumns :+ KeyColumn("column1", StringType, 0) + allColumns = allColumns :+ NonKeyColumn("column4", FloatType, family2, "qualifier2") + allColumns = allColumns :+ NonKeyColumn("column3", BooleanType, family1, "qualifier1") - val keyColumn1 = KeyColumn("column1", StringType) - val keyColumn2 = KeyColumn("column2", IntegerType) - var keyColumns = List[KeyColumn]() - keyColumns = keyColumns :+ keyColumn1 - keyColumns = keyColumns :+ keyColumn2 - - val nonKeyColumn3 = NonKeyColumn("column3", BooleanType, family1, "qualifier1") - val nonKeyColumn4 = NonKeyColumn("column4", FloatType, family2, "qualifier2") - var nonKeyColumns = List[NonKeyColumn]() - nonKeyColumns = nonKeyColumns :+ nonKeyColumn3 - nonKeyColumns = nonKeyColumns :+ nonKeyColumn4 - - catalog.createTable(tableName, namespace, hbaseTableName, allColumns, keyColumns, nonKeyColumns) + catalog.createTable(tableName, namespace, hbaseTableName, allColumns) } test("Get Table") { @@ -98,13 +86,13 @@ class CatalogTest extends FunSuite with BeforeAndAfterAll with Logging { // check the data type assert(result.keyColumns(0).dataType === StringType) assert(result.keyColumns(1).dataType === IntegerType) - assert(result.nonKeyColumns(0).dataType === BooleanType) - assert(result.nonKeyColumns(1).dataType === FloatType) + assert(result.nonKeyColumns(0).dataType === FloatType) + assert(result.nonKeyColumns(1).dataType === BooleanType) val relation = catalog.lookupRelation(None, tableName) val hbRelation = relation.asInstanceOf[HBaseRelation] - assert(hbRelation.nonKeyColumns.map(_.family) == List("family1", "family2")) - val keyColumns = Seq(KeyColumn("column1", StringType), KeyColumn("column2", IntegerType)) + assert(hbRelation.nonKeyColumns.map(_.family) == List("family2", "family1")) + val keyColumns = Seq(KeyColumn("column1", StringType, 0), KeyColumn("column2", IntegerType, 1)) assert(hbRelation.keyColumns.equals(keyColumns)) assert(relation.childrenResolved) } diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala old mode 100644 new mode 100755 index 81dad0bc13208..8ae7449a12940 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.hbase.TestHbase._ @Ignore class HBaseBasicOperationSuite extends QueryTest { - TestData // Initialize TestData test("create table") { sql( """CREATE TABLE tableName (col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER, diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseMainTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseMainTest.scala index 573563589080c..d4e7fef69621e 100644 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseMainTest.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseMainTest.scala @@ -1,192 +1,169 @@ -//package org.apache.spark.sql.hbase -// -//import java.io.{ObjectOutputStream, ByteArrayOutputStream, DataOutputStream} -// -//import org.apache.hadoop.conf.Configuration -//import org.apache.hadoop.hbase._ -//import org.apache.hadoop.hbase.client._ -//import org.apache.log4j.Logger -//import org.apache.spark -//import org.apache.spark.sql.SchemaRDD -//import org.apache.spark.sql.catalyst.expressions.Attribute -//import org.apache.spark.sql.catalyst.types.{DoubleType, ShortType, StringType} -//import org.apache.spark.sql.hbase.DataTypeUtils._ -//import org.apache.spark.sql.hbase.HBaseCatalog.{Column, Columns} -//import org.apache.spark.sql.test.TestSQLContext -//import org.apache.spark.sql.test.TestSQLContext._ -//import org.apache.spark.{Logging, SparkConf, sql} -//import org.scalatest.{BeforeAndAfterAll, FunSuite} -//import spark.sql.Row -// -///** -// * HBaseIntegrationTest -// * Created by sboesch on 9/27/14. -// */ -//object HBaseMainTest extends FunSuite with BeforeAndAfterAll with Logging { -// @transient val logger = Logger.getLogger(getClass.getName) -// -// val useMiniCluster: Boolean = false -// -// val NMasters = 1 -// val NRegionServers = 1 -// // 3 -// val NDataNodes = 0 -// -// val NWorkers = 1 -// -// @transient var cluster: MiniHBaseCluster = null -// @transient var config: Configuration = null -// @transient var hbaseAdmin: HBaseAdmin = null -// @transient var hbContext: HBaseSQLContext = null -// @transient var catalog: HBaseCatalog = null -// @transient var testUtil: HBaseTestingUtility = null -// -// case class MyTable(col1: String, col2: Byte, col3: Short, col4: Int, col5: Long, -// col6: Float, col7: Double) -// -// val DbName = "mynamespace" -// val TabName = "myTable" -// val HbaseTabName = "hbasetaba" -// -// def ctxSetup() { -// if (useMiniCluster) { -// logger.info(s"Spin up hbase minicluster w/ $NMasters mast, $NRegionServers RS, $NDataNodes dataNodes") -// testUtil = new HBaseTestingUtility -// config = testUtil.getConfiguration -// } else { -// config = HBaseConfiguration.create -// } -// // cluster = HBaseTestingUtility.createLocalHTU. -// // startMiniCluster(NMasters, NRegionServers, NDataNodes) -// // config = HBaseConfiguration.create -// config.set("hbase.regionserver.info.port", "-1") -// config.set("hbase.master.info.port", "-1") -// config.set("dfs.client.socket-timeout", "240000") -// config.set("dfs.datanode.socket.write.timeout", "240000") -// config.set("zookeeper.session.timeout", "240000") -// config.set("zookeeper.minSessionTimeout", "10") -// config.set("zookeeper.tickTime", "10") -// config.set("hbase.rpc.timeout", "240000") -// config.set("ipc.client.connect.timeout", "240000") -// config.set("dfs.namenode.stale.datanode.interva", "240000") -// config.set("hbase.rpc.shortoperation.timeout", "240000") -// config.set("hbase.regionserver.lease.period", "240000") -// -// if (useMiniCluster) { -// cluster = testUtil.startMiniCluster(NMasters, NRegionServers) -// println(s"# of region servers = ${cluster.countServedRegions}") -// } -// -// @transient val conf = new SparkConf -// val SparkPort = 11223 -// conf.set("spark.ui.port", SparkPort.toString) -// // @transient val sc = new SparkContext(s"local[$NWorkers]", "HBaseTestsSparkContext", conf) -// hbContext = new HBaseSQLContext(TestSQLContext.sparkContext, config) -// -// catalog = hbContext.catalog -// hbaseAdmin = new HBaseAdmin(config) -// -// } -// -// def tableSetup() = { -// createTable() -// } -// -// def createTable() = { -// -// val createTable = useMiniCluster -// if (createTable) { -// try { -// hbContext.sql( s"""CREATE TABLE $TabName(col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER, -// col5 LONG, col6 FLOAT, col7 DOUBLE) -// MAPPED BY ($HbaseTabName, KEYS=[col7, col1, col3], COLS=[col2=cf1.cq11, -// col4=cf1.cq12, col5=cf2.cq21, col6=cf2.cq22])""" -// .stripMargin) -// } catch { -// case e: TableExistsException => -// e.printStackTrace -// } -// -// try { -// val hdesc = new HTableDescriptor(TableName.valueOf(HbaseTabName)) -// Array(new HColumnDescriptor("cf1"), new HColumnDescriptor("cf2")).foreach { f => -// hdesc.addFamily(f) -// } -// hbaseAdmin.createTable(hdesc) -// } catch { -// case e: TableExistsException => -// e.printStackTrace -// } -// } -// -// if (!hbaseAdmin.tableExists(HbaseTabName)) { -// throw new IllegalArgumentException("where is our table?") -// } -// -// } -// -// def testGetTable = { -// println("get table") -// // prepare the test data -// HBaseCatalog.getKeysFromAllMetaTableRows(config) -// .foreach { r => logger.info(s"Metatable Rowkey: ${new String(r)}")} -// -// val oresult = catalog.getTable(TabName) -// assert(oresult.isDefined) -// val result = oresult.get -// assert(result.tablename == TabName) -// assert(result.hbaseTableName.tableName.getNameAsString == DbName + ":" + HbaseTabName) -// assert(result.colFamilies.size == 2) -// assert(result.columns.columns.size == 4) -// assert(result.rowKeyColumns.columns.size == 3) -// val relation = catalog.lookupRelation(Some(DbName), TabName) -// val hbRelation = relation.asInstanceOf[HBaseRelation] -// assert(hbRelation.colFamilies == Seq("cf1", "cf2")) -// assert(Seq("col7", "col1", "col3").zip(hbRelation.partitionKeys) -// .forall { x => x._1 == x._2.name}) -// val rkColumns = new Columns(Seq(KeyColumn("col7", null, "col7", DoubleType), -// KeyColumn("col1", null, "col1", StringType), -// KeyColumn("col3", null, "col3", ShortType))) -// assert(hbRelation.catalogTable.rowKeyColumns.equals(rkColumns)) -// assert(relation.childrenResolved) -// } -// -// def checkHBaseTableExists(hbaseTable: String) = { -// hbaseAdmin.listTableNames.foreach { t => println(s"table: $t")} -// val tname = TableName.valueOf(hbaseTable) -// hbaseAdmin.tableExists(tname) -// } -// -// def insertTestData() = { -// if (!checkHBaseTableExists(HbaseTabName)) { -// throw new IllegalStateException(s"Unable to find table ${HbaseTabName}") -// } -// val htable = new HTable(config, HbaseTabName) -// -// var put = new Put(makeRowKey(12345.0, "Michigan", 12345)) -// addRowVals(put, (123).toByte, 12345678, 12345678901234L, 1234.5678F) -// htable.put(put) -// put = new Put(makeRowKey(456789.0, "Michigan", 4567)) -// addRowVals(put, (456).toByte, 456789012, 4567890123446789L, 456.78901F) -// htable.put(put) -// htable.close -// -// } -// -// val runMultiTests: Boolean = false -// -// def testQuery() { -// ctxSetup() -// createTable() -// // testInsertIntoTable -// // testHBaseScanner -// -// if (!checkHBaseTableExists(HbaseTabName)) { -// throw new IllegalStateException(s"Unable to find table ${HbaseTabName}") -// } -// -// insertTestData -// +package org.apache.spark.sql.hbase + +import java.io.{ObjectOutputStream, ByteArrayOutputStream, DataOutputStream} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.client._ +import org.apache.log4j.Logger +import org.apache.spark +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.hbase.DataTypeUtils._ +import org.apache.spark.sql.hbase.HBaseCatalog._ +import org.apache.spark.sql.hbase.KeyColumn +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.{Logging, SparkConf, sql} +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import spark.sql.Row +import org.apache.hadoop.hbase.util.Bytes +import scala.collection.mutable.ArrayBuffer + +/** +* HBaseIntegrationTest +* Created by sboesch on 9/27/14. +*/ +object HBaseMainTest extends FunSuite with BeforeAndAfterAll with Logging { + @transient val logger = Logger.getLogger(getClass.getName) + + val useMiniCluster: Boolean = false + + val NMasters = 1 + val NRegionServers = 1 + // 3 + val NDataNodes = 0 + + val NWorkers = 1 + + @transient var cluster: MiniHBaseCluster = null + @transient var config: Configuration = null + @transient var hbaseAdmin: HBaseAdmin = null + @transient var hbContext: HBaseSQLContext = null + @transient var catalog: HBaseCatalog = null + @transient var testUtil: HBaseTestingUtility = null + + case class MyTable(col1: String, col2: Byte, col3: Short, col4: Int, col5: Long, + col6: Float, col7: Double) + + val DbName = "mynamespace" + val TabName = "myTable" + val HbaseTabName = "hbaseTableName" + + def ctxSetup() { + if (useMiniCluster) { + logger.info(s"Spin up hbase minicluster w/ $NMasters mast, $NRegionServers RS, $NDataNodes dataNodes") + testUtil = new HBaseTestingUtility + config = testUtil.getConfiguration + } else { + config = HBaseConfiguration.create + } + // cluster = HBaseTestingUtility.createLocalHTU. + // startMiniCluster(NMasters, NRegionServers, NDataNodes) + // config = HBaseConfiguration.create + config.set("hbase.regionserver.info.port", "-1") + config.set("hbase.master.info.port", "-1") + config.set("dfs.client.socket-timeout", "240000") + config.set("dfs.datanode.socket.write.timeout", "240000") + config.set("zookeeper.session.timeout", "240000") + config.set("zookeeper.minSessionTimeout", "10") + config.set("zookeeper.tickTime", "10") + config.set("hbase.rpc.timeout", "240000") + config.set("ipc.client.connect.timeout", "240000") + config.set("dfs.namenode.stale.datanode.interva", "240000") + config.set("hbase.rpc.shortoperation.timeout", "240000") + config.set("hbase.regionserver.lease.period", "240000") + + if (useMiniCluster) { + cluster = testUtil.startMiniCluster(NMasters, NRegionServers) + println(s"# of region servers = ${cluster.countServedRegions}") + } + + @transient val conf = new SparkConf + val SparkPort = 11223 + conf.set("spark.ui.port", SparkPort.toString) + // @transient val sc = new SparkContext(s"local[$NWorkers]", "HBaseTestsSparkContext", conf) + hbContext = new HBaseSQLContext(TestSQLContext.sparkContext) + + catalog = hbContext.catalog + hbaseAdmin = new HBaseAdmin(config) + + } + + def tableSetup() = { + createTable() + } + + def createTable() = { + + val createTable = useMiniCluster + if (createTable) { + try { + hbContext.sql( s"""CREATE TABLE $TabName(col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER, + col5 LONG, col6 FLOAT, col7 DOUBLE) + MAPPED BY ($HbaseTabName, KEYS=[col7, col1, col3], COLS=[col2=cf1.cq11, + col4=cf1.cq12, col5=cf2.cq21, col6=cf2.cq22])""" + .stripMargin) + } catch { + case e: TableExistsException => + e.printStackTrace + } + + try { + val hdesc = new HTableDescriptor(TableName.valueOf(HbaseTabName)) + Array(new HColumnDescriptor("cf1"), new HColumnDescriptor("cf2")).foreach { f => + hdesc.addFamily(f) + } + hbaseAdmin.createTable(hdesc) + } catch { + case e: TableExistsException => + e.printStackTrace + } + } + + if (!hbaseAdmin.tableExists(HbaseTabName)) { + throw new IllegalArgumentException("where is our table?") + } + + } + + def checkHBaseTableExists(hbaseTable: String) = { + hbaseAdmin.listTableNames.foreach { t => println(s"table: $t")} + val tname = TableName.valueOf(hbaseTable) + hbaseAdmin.tableExists(tname) + } + + def insertTestData() = { + if (!checkHBaseTableExists(HbaseTabName)) { + throw new IllegalStateException(s"Unable to find table ${HbaseTabName}") + } + val htable = new HTable(config, HbaseTabName) + + var put = new Put(makeRowKey(12345.0, "Upen", 12345)) + addRowVals(put, (123).toByte, 12345678, 12345678901234L, 1234.5678F) + htable.put(put) + put = new Put(makeRowKey(456789.0, "Michigan", 4567)) + addRowVals(put, (456).toByte, 456789012, 4567890123446789L, 456.78901F) + htable.put(put) + htable.close + + } + + val runMultiTests: Boolean = false + + def testQuery() { + ctxSetup() + createTable() + // testInsertIntoTable + // testHBaseScanner + + if (!checkHBaseTableExists(HbaseTabName)) { + throw new IllegalStateException(s"Unable to find table ${HbaseTabName}") + } + + insertTestData + // var results: SchemaRDD = null // var data: Array[sql.Row] = null // @@ -271,152 +248,99 @@ // """.stripMargin) // printResults("Aggregates on non-rowkeys", results) // } -// } -// -// def printResults(msg: String, results: SchemaRDD) = { -// if (results.isInstanceOf[TestingSchemaRDD]) { -// val data = results.asInstanceOf[TestingSchemaRDD].collectPartitions -// println(s"For test [$msg]: Received data length=${data(0).length}: ${ -// data(0).mkString("RDD results: {", "],[", "}") -// }") -// } else { -// val data = results.collect -// println(s"For test [$msg]: Received data length=${data.length}: ${ -// data.mkString("RDD results: {", "],[", "}") -// }") -// } -// -// } -// -// def createTableTest2() { -// ctxSetup() -// // Following fails with Unresolved: -// // Col1 Sort is unresolved -// // Col4 and col2 Aggregation are unresolved (interesting col3 IS resolved) -// // val results = hbContext.sql(s"""SELECT col4, col1, col3, col2 FROM $TabName -// // WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 35 and col3 <= 50 group by col7, col1 -// // ORDER BY col1 DESC""" -// // .stripMargin) -// -// hbContext.sql( s"""CREATE TABLE $DbName.$TabName(col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER, -// col5 LONG, col6 FLOAT, col7 DOUBLE) -// MAPPED BY ($HbaseTabName, KEYS=[col7, col1, col3], COLS=[col2=cf1.cq11, -// col4=cf1.cq12, col5=cf2.cq21, col6=cf2.cq22])""" -// .stripMargin) -// -// val catTab = catalog.getTable(TabName) -// assert(catTab.get.tablename == TabName) -// -// testGetTable -// } -// -// def testInsertIntoTable() = { -// logger.info("Insert data into the test table using applySchema") -// ctxSetup() -// tableSetup() -// // import hbContext.createSchemaRDD -// val myRows = hbContext.sparkContext.parallelize(Range(1, 21).map { ix => -// MyTable(s"Michigan", ix.toByte, (ix.toByte * 256).asInstanceOf[Short], ix.toByte * 65536, ix.toByte * 65563L * 65536L, -// (ix.toByte * 65536.0).asInstanceOf[Float], ix.toByte * 65536.0D * 65563.0D) -// }) -// -// // import org.apache.spark.sql.execution.ExistingRdd -// // val myRowsSchema = ExistingRdd.productToRowRdd(myRows) -// // hbContext.applySchema(myRowsSchema, schema) -// val TempTabName = "MyTempTab" -// myRows.registerTempTable(TempTabName) -// -// val localData = myRows.collect -// -// hbContext.sql( -// s"""insert into $TabName select * from $TempTabName""".stripMargin) -// -// val hbRelation = catalog.lookupRelation(Some(DbName), TabName).asInstanceOf[HBaseRelation] -// -// val hbasePlanner = new SparkPlanner with HBaseStrategies { -// @transient override val hbaseContext: HBaseSQLContext = hbContext -// } -// -// val myRowsSchemaRdd = hbContext.createSchemaRDD(myRows) -// val insertPlan = hbasePlanner.InsertIntoHBaseTableFromRdd(hbRelation, -// myRowsSchemaRdd)(hbContext) -// -// var rowKeysWithRows = myRowsSchemaRdd.zip( -// HBaseRelation.rowKeysFromRows(myRowsSchemaRdd, hbRelation)) -// // var keysCollect = rowKeysWithRows.collect -// HBaseStrategies.putToHBase(myRows, hbRelation, hbContext) -// -// val preparedInsertRdd = insertPlan.execute -// val executedInsertRdd = preparedInsertRdd.collect -// -// val rowsRdd = myRowsSchemaRdd -// val rowKeysWithRows2 = rowsRdd.zip( -// HBaseRelation.rowKeysFromRows(rowsRdd, hbRelation)) -// HBaseStrategies.putToHBase(rowsRdd, hbRelation, hbContext) -// -// -// cluster.shutdown -// } -// -// import org.apache.spark.sql.hbase.HBaseRelation.RowKeyParser -// -// def makeRowKey(col7: Double, col1: String, col3: Short) = { -// val size = 1 + sizeOf(col7) + sizeOf(col1) + sizeOf(col3) + 3 * 2 -// + RowKeyParser.DimensionCountLen -// // val barr = new Array[Byte](size) -// val bos = new ByteArrayOutputStream(size) -// val dos = new DataOutputStream(bos) -// dos.writeByte(HBaseRelation.RowKeyParser.Version1) -// dos.writeDouble(col7) -// dos.writeBytes(col1) -// dos.writeShort(col3) -// var off = 1 -// dos.writeShort(off) -// off += sizeOf(col7) -// dos.writeShort(off) -// off += sizeOf(col1) -// dos.writeShort(off) -// dos.writeByte(3.toByte) -// val s = bos.toString -// // println((s"MakeRowKey: [${RowKeyParser.show(bos.toByteArray)}]") -// println(s"MakeRowKey: [${s}]") -// bos.toByteArray -// } -// -// def addRowVals(put: Put, col2: Byte, col4: Int, col5: Long, col6: Float) = { -// // val barr = new Array[Byte](size) -// var bos = new ByteArrayOutputStream() -// var dos = new DataOutputStream(bos) -// dos.writeByte(col2) -// put.add(s2b("cf1"), s2b("cq11"), bos.toByteArray) -// bos = new ByteArrayOutputStream() -// dos = new DataOutputStream(bos) -// dos.writeInt(col4) -// put.add(s2b("cf1"), s2b("cq12"), bos.toByteArray) -// bos = new ByteArrayOutputStream() -// dos = new DataOutputStream(bos) -// dos.writeLong(col5) -// put.add(s2b("cf2"), s2b("cq21"), bos.toByteArray) -// bos = new ByteArrayOutputStream() -// dos = new DataOutputStream(bos) -// dos.writeFloat(col6) -// put.add(s2b("cf2"), s2b("cq22"), bos.toByteArray) -// } -// -// def testHBaseScanner() = { -// val scan = new Scan -// val htable = new HTable(config, HbaseTabName) -// val scanner = htable.getScanner(scan) -// var res: Result = null -// do { -// res = scanner.next -// if (res != null) println(s"Row ${res.getRow} has map=${res.getNoVersionMap.toString}") -// } while (res != null) -// } -// -// def main(args: Array[String]) = { -// // testInsertIntoTable -// testQuery -// } -// -//} + } + + def printResults(msg: String, results: SchemaRDD) = { + if (results.isInstanceOf[TestingSchemaRDD]) { + val data = results.asInstanceOf[TestingSchemaRDD].collectPartitions + println(s"For test [$msg]: Received data length=${data(0).length}: ${ + data(0).mkString("RDD results: {", "],[", "}") + }") + } else { + val data = results.collect + println(s"For test [$msg]: Received data length=${data.length}: ${ + data.mkString("RDD results: {", "],[", "}") + }") + } + + } + + val allColumns: Seq[AbstractColumn] = Seq( + KeyColumn("col1", StringType, 1), + NonKeyColumn("col2", ByteType, "cf1", "cq11"), + KeyColumn("col3", ShortType, 2), + NonKeyColumn("col4", IntegerType, "cf1", "cq12"), + NonKeyColumn("col5", LongType, "cf2", "cq21"), + NonKeyColumn("col6", FloatType, "cf2", "cq22"), + KeyColumn("col7", DoubleType, 0) + ) + + val keyColumns = allColumns.filter(_.isInstanceOf[KeyColumn]) + .asInstanceOf[Seq[KeyColumn]].sortBy(_.order) + + + def makeRowKey(col7: Double, col1: String, col3: Short) = { + val row = new GenericRow(Array(col7, col1, col3)) + val key0 = DataTypeUtils.getRowColumnFromHBaseRawType(row, 0, DoubleType) + val key1 = DataTypeUtils.getRowColumnFromHBaseRawType(row, 1, StringType) + val key2 = DataTypeUtils.getRowColumnFromHBaseRawType(row, 2, ShortType) + + encodingRawKeyColumns(Seq(key0,key1,key2)) + } + + /** + * create row key based on key columns information + * @param rawKeyColumns sequence of byte array representing the key columns + * @return array of bytes + */ + def encodingRawKeyColumns(rawKeyColumns: Seq[HBaseRawType]): HBaseRawType = { + var buffer = ArrayBuffer[Byte]() + val delimiter: Byte = 0 + var index = 0 + for (rawKeyColumn <- rawKeyColumns) { + val keyColumn = keyColumns(index) + buffer = buffer ++ rawKeyColumn + if (keyColumn.dataType == StringType) { + buffer += delimiter + } + index = index + 1 + } + buffer.toArray + } + + def addRowVals(put: Put, col2: Byte, col4: Int, col5: Long, col6: Float) = { + // val barr = new Array[Byte](size) + var bos = new ByteArrayOutputStream() + var dos = new DataOutputStream(bos) + dos.writeByte(col2) + put.add(Bytes.toBytes("cf1"), Bytes.toBytes("cq11"), bos.toByteArray) + bos = new ByteArrayOutputStream() + dos = new DataOutputStream(bos) + dos.writeInt(col4) + put.add(Bytes.toBytes("cf1"), Bytes.toBytes("cq12"), bos.toByteArray) + bos = new ByteArrayOutputStream() + dos = new DataOutputStream(bos) + dos.writeLong(col5) + put.add(Bytes.toBytes("cf2"), Bytes.toBytes("cq21"), bos.toByteArray) + bos = new ByteArrayOutputStream() + dos = new DataOutputStream(bos) + dos.writeFloat(col6) + put.add(Bytes.toBytes("cf2"), Bytes.toBytes("cq22"), bos.toByteArray) + } + + def testHBaseScanner() = { + val scan = new Scan + val htable = new HTable(config, HbaseTabName) + val scanner = htable.getScanner(scan) + var res: Result = null + do { + res = scanner.next + if (res != null) println(s"Row ${res.getRow} has map=${res.getNoVersionMap.toString}") + } while (res != null) + } + + def main(args: Array[String]) = { + testQuery + } + +} diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/RowKeyParserSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/RowKeyParserSuite.scala index 8ddc4cbc8c2d8..646d51968c671 100644 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/RowKeyParserSuite.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/RowKeyParserSuite.scala @@ -1,98 +1,115 @@ -//package org.apache.spark.sql.hbase -// -//import java.io.{ByteArrayOutputStream, DataOutputStream} -// -//import org.apache.log4j.Logger -//import org.apache.spark.sql.catalyst.expressions.Row -//import org.apache.spark.sql.catalyst.types._ -//import org.apache.spark.sql.hbase.DataTypeUtils._ -//import org.apache.spark.sql.hbase.HBaseCatalog.{Column, Columns} -//import org.scalatest.{FunSuite, ShouldMatchers} -// -///** -// * CompositeRowKeyParserTest -// * Created by sboesch on 9/25/14. -// */ -// -//case class TestCall(callId: Int, userId: String, duration: Double) -// -//class RowKeyParserSuite extends FunSuite with ShouldMatchers { -// @transient val logger = Logger.getLogger(getClass.getName) -// -// import org.apache.spark.sql.hbase.HBaseRelation.RowKeyParser -// -// def makeRowKey(col7: Double, col1: String, col3: Short) = { -// val size = 1 + sizeOf(col7) + sizeOf(col1) + sizeOf(col3) + 3 * 2 + -// RowKeyParser.DimensionCountLen -// // val barr = new Array[Byte](size) -// val bos = new ByteArrayOutputStream(size) -// val dos = new DataOutputStream(bos) -// dos.writeByte(RowKeyParser.Version1) -// dos.writeDouble(col7) -// dos.writeBytes(col1) -// dos.writeShort(col3) -// var off = 1 -// dos.writeShort(off) -// off += sizeOf(col7) -// dos.writeShort(off) -// off += sizeOf(col1) -// dos.writeShort(off) -// dos.writeByte(3.toByte) -// val s = bos.toString -// // println((s"MakeRowKey: [${RowKeyParser.show(bos.toByteArray)}]") -// println(s"MakeRowKey: [${s}]") -// bos.toByteArray -// } -// -// test("rowkey test") { -// -// val cols = Range(0, 3).zip(Seq(DoubleType, StringType, ShortType)) -// .map { case (ix, dataType) => -// KeyColumn(s"col{ix+10}", s"cf${ix + 1}", s"cq${ix + 10}", dataType) -// }.toSeq -// -// val pat = makeRowKey(12345.6789, "Column1-val", 12345) -// val parsedKeyMap = RowKeyParser.parseRowKeyWithMetaData(cols, pat) -// println(s"parsedKeyWithMetaData: ${parsedKeyMap.toString}") -// // assert(parsedKeyMap === Map("col7" ->(12345.6789, "col1" -> "Column1-val", "col3" -> 12345))) -// // assert(parsedKeyMap.values.toList.sorted === List(12345.6789, "Column1-val",12345)) -// -// val parsedKey = RowKeyParser.parseRowKey(pat) -// println(s"parsedRowKey: ${parsedKey.toString}") -// -// } -// -// test("CreateKeyFromCatalystRow") { -// import org.apache.spark.sql.catalyst.types._ -// val schema: StructType = new StructType(Seq( -// new StructField("callId", IntegerType, false), -// new StructField("userId", StringType, false), -// new StructField("cellTowers", StringType, true), -// new StructField("callType", ByteType, false), -// new StructField("deviceId", LongType, false), -// new StructField("duration", DoubleType, false)) -// ) -// -// val keyCols = new Columns(Seq( -// KeyColumn("userId", "cf1", "useridq", StringType), -// KeyColumn("callId", "cf1", "callidq", IntegerType), -// KeyColumn("deviceId", "cf2", "deviceidq", LongType) -// )) -// // val cols = new Columns(Seq( -// // Column("cellTowers","cf2","cellTowersq",StringType), -// // Column("callType","cf1","callTypeq",ByteType), -// // Column("duration","cf2","durationq",DoubleType) -// // )) -// val row = Row(12345678, "myUserId1", "tower1,tower9,tower3", 22.toByte, 111223445L, 12345678.90123) -// val key = RowKeyParser.createKeyFromCatalystRow(schema, keyCols, row) -// assert(key.length == 29) -// val parsedKey = RowKeyParser.parseRowKey(key) -// assert(parsedKey.length == 3) -// import org.apache.spark.sql.hbase.DataTypeUtils.cast -// assert(cast(parsedKey(0), StringType) == "myUserId1") -// assert(cast(parsedKey(1), IntegerType) == 12345678) -// assert(cast(parsedKey(2), LongType) == 111223445L) -// -// } -// -//} +package org.apache.spark.sql.hbase + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.hbase.DataTypeUtils._ +import org.scalatest.{FunSuite, ShouldMatchers} + +import scala.collection.mutable.ArrayBuffer + +/** + * CompositeRowKeyParserTest + * Created by sboesch on 9/25/14. + */ + +case class TestCall(callId: Int, userId: String, duration: Double) + +class RowKeyParserSuite extends FunSuite with ShouldMatchers { + @transient val logger = Logger.getLogger(getClass.getName) + + val allColumns: Seq[AbstractColumn] = Seq( + KeyColumn("callId", IntegerType, 1), + KeyColumn("userId", StringType, 2), + NonKeyColumn("cellTowers", StringType, "cf2", "cellTowersq"), + NonKeyColumn("callType", ByteType, "cf1", "callTypeq"), + KeyColumn("deviceId", LongType, 0), + NonKeyColumn("duration", DoubleType, "cf2", "durationq") + ) + + val keyColumns = allColumns.filter(_.isInstanceOf[KeyColumn]) + .asInstanceOf[Seq[KeyColumn]].sortBy(_.order) + val nonKeyColumns = allColumns.filter(_.isInstanceOf[NonKeyColumn]) + .asInstanceOf[Seq[NonKeyColumn]] + + /** + * create row key based on key columns information + * @param rawKeyColumns sequence of byte array representing the key columns + * @return array of bytes + */ + def encodingRawKeyColumns(rawKeyColumns: Seq[HBaseRawType]): HBaseRawType = { + var buffer = ArrayBuffer[Byte]() + val delimiter: Byte = 0 + var index = 0 + for (rawKeyColumn <- rawKeyColumns) { + val keyColumn = keyColumns(index) + buffer = buffer ++ rawKeyColumn + if (keyColumn.dataType == StringType) { + buffer += delimiter + } + index = index + 1 + } + buffer.toArray + } + + /** + * get the sequence of key columns from the byte array + * @param rowKey array of bytes + * @return sequence of byte array + */ + def decodingRawKeyColumns(rowKey: HBaseRawType): Seq[HBaseRawType] = { + var rowKeyList = List[HBaseRawType]() + val delimiter: Byte = 0 + var index = 0 + for (keyColumn <- keyColumns) { + var buffer = ArrayBuffer[Byte]() + val dataType = keyColumn.dataType + if (dataType == StringType) { + while (index < rowKey.length && rowKey(index) != delimiter) { + buffer += rowKey(index) + index = index + 1 + } + index = index + 1 + } + else { + val length = NativeType.defaultSizeOf(dataType.asInstanceOf[NativeType]) + for (i <- 0 to (length - 1)) { + buffer += rowKey(index) + index = index + 1 + } + } + rowKeyList = rowKeyList :+ buffer.toArray + } + rowKeyList + } + + test("CreateKeyFromCatalystRow") { + val row = Row(12345678, "myUserId1", "tower1,tower9,tower3", 22.toByte, 111223445L, 12345678.90123) + val allColumnsWithIndex = allColumns.zipWithIndex + val rawKeyColsWithKeyIndex: Seq[(HBaseRawType, Int)] = { + for { + (column, index) <- allColumnsWithIndex + if (column.isInstanceOf[KeyColumn]) + key = column.asInstanceOf[KeyColumn] + } yield ( + DataTypeUtils.getRowColumnFromHBaseRawType(row, index, column.dataType), + key.order) + } + + val rawKeyCols = rawKeyColsWithKeyIndex.sortBy(_._2).map(_._1) + val rowkeyA = encodingRawKeyColumns(rawKeyCols) + val parsedKey = decodingRawKeyColumns(rowkeyA) + + val mr = new GenericMutableRow(allColumns.length) + parsedKey.zipWithIndex.foreach{ + case (rawkey, keyIndex) => { + val key = keyColumns(keyIndex) + val index = allColumns.indexOf(key) + setRowColumnFromHBaseRawType( + mr, index, rawkey, key.dataType) + } + } + + println(mr.getLong(4)) + } +} From a7a1c64cbd5a0c3b4b8f74f552230d974c633bed Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Thu, 30 Oct 2014 15:10:06 -0700 Subject: [PATCH 10/10] modify according to comment --- .../org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala index 262a618adcea2..19e0cf17190ae 100644 --- a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala @@ -24,9 +24,7 @@ import org.scalatest.{FunSuite, Matchers} /** * Test Suite for HBase bulkload feature - * Created by jackylk on 2014/10/25. */ - class HBaseBulkloadParserTest extends FunSuite with Matchers { // Test if we can parse 'LOAD DATA LOCAL INPATH './usr/file.csv' INTO TABLE tb'