From fd8d1218c26095aa5e18bce2002fda7d3274cbe2 Mon Sep 17 00:00:00 2001 From: haosdent Date: Sat, 22 Mar 2014 14:57:44 +0800 Subject: [PATCH 01/11] Add spark-hbase and class-level documents. --- external/hbase/pom.xml | 92 ++++++++++++ .../apache/spark/nosql/hbase/HBaseUtils.scala | 60 ++++++++ .../spark/nosql/hbase/SparkHBaseWriter.scala | 139 ++++++++++++++++++ .../hbase/src/test/resources/log4j.properties | 29 ++++ .../apache/spark/nosql/hbase/HBaseSuite.scala | 48 ++++++ pom.xml | 1 + 6 files changed, 369 insertions(+) create mode 100644 external/hbase/pom.xml create mode 100644 external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala create mode 100644 external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala create mode 100644 external/hbase/src/test/resources/log4j.properties create mode 100644 external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala diff --git a/external/hbase/pom.xml b/external/hbase/pom.xml new file mode 100644 index 0000000000000..e73eb88c07396 --- /dev/null +++ b/external/hbase/pom.xml @@ -0,0 +1,92 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.0.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-nosql-hbase + jar + Spark Project External HBase + http://spark.apache.org/ + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.hbase + hbase + ${hbase.version} + + + org.apache.hbase + hbase + ${hbase.version} + test-jar + test + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.hadoop + hadoop-test + ${hadoop.version} + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala new file mode 100644 index 0000000000000..d98a1c1800d2d --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.nosql.hbase + +import org.apache.hadoop.io.Text +import org.apache.spark.rdd.RDD + +/** + * A public object that provide HBase supports. + * You could save RDD into HBase through [[org.apache.spark.nosql.hbase.HBaseUtils.saveAsHBaseTable]] method. + */ +object HBaseUtils { + + /** + * Save [[org.apache.spark.rdd.RDD[Text]]] as a HBase table + * @param rdd [[org.apache.spark.rdd.RDD[Text]]] + * @param zkHost the zookeeper hosts. e.g. "10.232.98.10,10.232.98.11,10.232.98.12" + * @param zkPort the zookeeper client listening port. e.g. "2181" + * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" + * @param table the name of table which we save records + * @param rowkeyType the type of rowkey. [[org.apache.spark.nosql.hbase.HBaseType]] + * @param columns the column list. [[org.apache.spark.nosql.hbase.HBaseColumn]] + * @param delimiter the delimiter which used to split record into fields + */ + def saveAsHBaseTable(rdd: RDD[Text], zkHost: String, zkPort: String, zkNode: String, + table: String, rowkeyType: String, columns: List[HBaseColumn], delimiter: Char) { + val conf = new HBaseConf(zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) + + def writeToHBase(iter: Iterator[Text]) { + val writer = new SparkHBaseWriter(conf) + + try { + writer.init() + while (iter.hasNext) { + val record = iter.next() + writer.write(record) + } + } finally { + writer.close() + } + } + + rdd.foreachPartition(writeToHBase) + } +} diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala new file mode 100644 index 0000000000000..435688be81f5a --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.nosql.hbase + +import org.apache.hadoop.hbase.client.{Put, HTable} +import org.apache.hadoop.io.Text +import org.apache.hadoop.hbase.util.Bytes +import org.apache.commons.codec.binary.Hex +import org.apache.hadoop.hbase.HConstants +import org.apache.hadoop.conf.Configuration +import java.io.IOException + +/** + * Internal helper class that saves an RDD using a HBase OutputFormat. This is only public + * because we need to access this class from the `spark` package to use some package-private HBase + * functions, but this class should not be used directly by users. + */ +private[apache] +class SparkHBaseWriter(conf: HBaseConf) { + + private var htable: HTable = null + + val zkHost = conf.zkHost + val zkPort = conf.zkPort + val zkNode = conf.zkNode + val table = conf.table + val rowkeyType = conf.rowkeyType + val columns = conf.columns + val delimiter = conf.delimiter + + def init() { + val conf = new Configuration() + conf.set(HConstants.ZOOKEEPER_QUORUM, zkHost) + conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort) + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkNode) + htable = new HTable(conf, table) + // Use default writebuffersize to submit batch puts + htable.setAutoFlush(false) + } + + /** + * Convert field to bytes + * @param field split by delimiter from record + * @param kind the type of field + * @return + */ + def toByteArr(field: String, kind: String) = kind match { + case HBaseType.Boolean => Bytes.toBytes(field.toBoolean) + case HBaseType.Short => Bytes.toBytes(field.toShort) + case HBaseType.Int => Bytes.toBytes(field.toInt) + case HBaseType.Long => Bytes.toBytes(field.toLong) + case HBaseType.Float => Bytes.toBytes(field.toFloat) + case HBaseType.Double => Bytes.toBytes(field.toDouble) + case HBaseType.String => Bytes.toBytes(field) + case HBaseType.Bytes => Hex.decodeHex(field.toCharArray) + case _ => throw new IOException("Unsupported data type.") + } + + /** + * Convert a string record to [[org.apache.hadoop.hbase.client.Put]] + * @param record + * @return + */ + def parseRecord(record: String) = { + val fields = record.split(delimiter) + val put = new Put(toByteArr(fields(0), rowkeyType)) + + List.range(1, fields.size) foreach { + i => put.add(columns(i - 1).family, columns(i - 1).qualifier, + toByteArr(fields(i), columns(i - 1).typ)) + } + + put + } + + def write(record: Text) { + val put = parseRecord(record.toString) + htable.put(put) + } + + def close() { + htable.close() + } +} + +/** + * Contains the types which supported to + * parse from [[java.lang.String]] into [[scala.Array[ B y t e]]] + */ +object HBaseType { + val Boolean = "bool" + val Short = "short" + val Int = "int" + val Long = "long" + val Float = "float" + val Double = "double" + val String = "string" + val Bytes = "bytes" +} + +/** + * A representation of HBase Column + * @param family HBase Column Family + * @param qualifier HBase Column Qualifier + * @param typ type [[org.apache.spark.nosql.hbase.HBaseType]] + */ +class HBaseColumn(val family: Array[Byte], val qualifier: Array[Byte], val typ: String) + extends Serializable + +/** + * A representation of HBase Configuration. + * It contains important parameters which used to create connection to HBase Servers + * @param zkHost the zookeeper hosts. e.g. "10.232.98.10,10.232.98.11,10.232.98.12" + * @param zkPort the zookeeper client listening port. e.g. "2181" + * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" + * @param table the name of table which we save records + * @param rowkeyType the type of rowkey. [[org.apache.spark.nosql.hbase.HBaseType]] + * @param columns the column list. [[org.apache.spark.nosql.hbase.HBaseColumn]] + * @param delimiter the delimiter which used to split record into fields + */ +class HBaseConf(val zkHost: String, val zkPort: String, val zkNode: String, + val table: String, val rowkeyType: String, val columns: List[HBaseColumn], + val delimiter: Char) + extends Serializable \ No newline at end of file diff --git a/external/hbase/src/test/resources/log4j.properties b/external/hbase/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..28606bc80f56c --- /dev/null +++ b/external/hbase/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=external/hbase/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala new file mode 100644 index 0000000000000..dc5f46fc79285 --- /dev/null +++ b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala @@ -0,0 +1,48 @@ +package org.apache.spark.nosql.hbase + +import org.scalatest.FunSuite +import org.apache.hadoop.io.Text +import org.apache.spark.{SparkContext, LocalSparkContext} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility} +import org.apache.hadoop.hbase.client.{Scan, HTable} + +class HBaseSuite extends FunSuite with LocalSparkContext { + + test("write SequenceFile using HBase") { + sc = new SparkContext("local", "test") + val nums = sc.makeRDD(1 to 3).map(x => new Text("a" + x + " 1.0")) + + val table = "test" + val rowkeyType = HBaseType.String + val cfBytes = Bytes.toBytes("cf") + val qualBytes = Bytes.toBytes("qual0") + val columns = List[HBaseColumn](new HBaseColumn(cfBytes, qualBytes, HBaseType.Float)) + val delimiter = ' ' + + val util = new HBaseTestingUtility() + util.startMiniCluster() + util.createTable(Bytes.toBytes(table), cfBytes) + val conf = util.getConfiguration + val zkHost = conf.get(HConstants.ZOOKEEPER_QUORUM) + val zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + val zkNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT) + + HBaseUtils.saveAsHBaseTable(nums, zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) + + val htable = new HTable(conf, table) + val scan = new Scan() + val rs = htable.getScanner(scan) + + var result = rs.next() + var i = 1 + while (result != null) { + val rowkey = Bytes.toString(result.getRow) + assert(rowkey == "a" + i) + val value = Bytes.toFloat(result.getValue(cfBytes, qualBytes)) + assert(value == 1.0) + result = rs.next() + i += 1 + } + } +} diff --git a/pom.xml b/pom.xml index 01341d21b7f23..a5fb8644c80c4 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,7 @@ external/flume external/zeromq external/mqtt + external/hbase examples From b9a894c6e85e3edc8b66c7e3dbffa5d7377c0918 Mon Sep 17 00:00:00 2001 From: haosdent Date: Sat, 22 Mar 2014 23:15:07 +0800 Subject: [PATCH 02/11] Fix visibility of SparkHBaseWriter. --- .../org/apache/spark/nosql/hbase/SparkHBaseWriter.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala index 435688be81f5a..87ea44f080dd0 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala @@ -30,7 +30,7 @@ import java.io.IOException * because we need to access this class from the `spark` package to use some package-private HBase * functions, but this class should not be used directly by users. */ -private[apache] +private[hbase] class SparkHBaseWriter(conf: HBaseConf) { private var htable: HTable = null @@ -59,7 +59,7 @@ class SparkHBaseWriter(conf: HBaseConf) { * @param kind the type of field * @return */ - def toByteArr(field: String, kind: String) = kind match { + def toByteArray(field: String, kind: String) = kind match { case HBaseType.Boolean => Bytes.toBytes(field.toBoolean) case HBaseType.Short => Bytes.toBytes(field.toShort) case HBaseType.Int => Bytes.toBytes(field.toInt) @@ -78,11 +78,11 @@ class SparkHBaseWriter(conf: HBaseConf) { */ def parseRecord(record: String) = { val fields = record.split(delimiter) - val put = new Put(toByteArr(fields(0), rowkeyType)) + val put = new Put(toByteArray(fields(0), rowkeyType)) List.range(1, fields.size) foreach { i => put.add(columns(i - 1).family, columns(i - 1).qualifier, - toByteArr(fields(i), columns(i - 1).typ)) + toByteArray(fields(i), columns(i - 1).typ)) } put From c8d4e9e135bc7483eb972d0e1fba88f98200a337 Mon Sep 17 00:00:00 2001 From: haosdent Date: Sat, 22 Mar 2014 23:21:31 +0800 Subject: [PATCH 03/11] Fix document error in log4j.properties --- external/hbase/src/test/resources/log4j.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/hbase/src/test/resources/log4j.properties b/external/hbase/src/test/resources/log4j.properties index 28606bc80f56c..ea56b9e41beb2 100644 --- a/external/hbase/src/test/resources/log4j.properties +++ b/external/hbase/src/test/resources/log4j.properties @@ -15,7 +15,7 @@ # limitations under the License. # -# Set everything to be logged to the file streaming/target/unit-tests.log +# Set everything to be logged to the file external/hbase/target/unit-tests.log log4j.rootCategory=INFO, file # log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender From 4e9fff22429716ed4ec46356b0bec1eb885a48e3 Mon Sep 17 00:00:00 2001 From: haosdent Date: Sun, 23 Mar 2014 00:58:31 +0800 Subject: [PATCH 04/11] Handle htable.close() exception scenrio. --- .../apache/spark/nosql/hbase/HBaseUtils.scala | 8 ++++ .../spark/nosql/hbase/SparkHBaseWriter.scala | 47 ++++++++++++------- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala index d98a1c1800d2d..4341896bcc2ad 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala @@ -28,6 +28,13 @@ object HBaseUtils { /** * Save [[org.apache.spark.rdd.RDD[Text]]] as a HBase table + * + * The format of record in RDD should looks like this: + * rowkey|delimiter|column|delimiter|column|delimiter|... + * For example (if delimiter is ","): + * 0001,apple,banana + * "0001" is rowkey field while "apple" and "banana" are column fields. + * * @param rdd [[org.apache.spark.rdd.RDD[Text]]] * @param zkHost the zookeeper hosts. e.g. "10.232.98.10,10.232.98.11,10.232.98.12" * @param zkPort the zookeeper client listening port. e.g. "2181" @@ -46,6 +53,7 @@ object HBaseUtils { try { writer.init() + while (iter.hasNext) { val record = iter.next() writer.write(record) diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala index 87ea44f080dd0..4426b3d4525c3 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala @@ -24,14 +24,16 @@ import org.apache.commons.codec.binary.Hex import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.conf.Configuration import java.io.IOException +import org.apache.spark.Logging /** * Internal helper class that saves an RDD using a HBase OutputFormat. This is only public - * because we need to access this class from the `spark` package to use some package-private HBase + * because we need to access this class from the `hbase` package to use some package-private HBase * functions, but this class should not be used directly by users. */ private[hbase] -class SparkHBaseWriter(conf: HBaseConf) { +class SparkHBaseWriter(conf: HBaseConf) + extends Logging { private var htable: HTable = null @@ -72,35 +74,46 @@ class SparkHBaseWriter(conf: HBaseConf) { } /** - * Convert a string record to [[org.apache.hadoop.hbase.client.Put]] + * Convert a [[org.apache.hadoop.io.Text]] record to [[org.apache.hadoop.hbase.client.Put]] + * and put it to HBase * @param record * @return */ - def parseRecord(record: String) = { - val fields = record.split(delimiter) - val put = new Put(toByteArray(fields(0), rowkeyType)) + def write(record: Text) = { + val fields = record.toString.split(delimiter) - List.range(1, fields.size) foreach { - i => put.add(columns(i - 1).family, columns(i - 1).qualifier, - toByteArray(fields(i), columns(i - 1).typ)) - } + // Check the format of record + if (fields.size == columns.length + 1) { + val put = new Put(toByteArray(fields(0), rowkeyType)) - put - } + List.range(1, fields.size) foreach { + i => put.add(columns(i - 1).family, columns(i - 1).qualifier, + toByteArray(fields(i), columns(i - 1).typ)) + } - def write(record: Text) { - val put = parseRecord(record.toString) - htable.put(put) + htable.put(put) + } else { + logWarning(s"Record ($record) is unacceptable.") + } } def close() { - htable.close() + Option(htable) match { + case Some(t) => { + try { + t.close() + } catch { + case ex: Exception => logWarning("Close HBase table failed.", ex) + } + } + case None => logWarning("HBase table variable is null!") + } } } /** * Contains the types which supported to - * parse from [[java.lang.String]] into [[scala.Array[ B y t e]]] + * parse from [[java.lang.String]] into [[scala.Array[Byte]]] */ object HBaseType { val Boolean = "bool" From 1a020d17bdab2ad9cfd74af1a149cd1784da01de Mon Sep 17 00:00:00 2001 From: haosdent Date: Sun, 23 Mar 2014 01:23:23 +0800 Subject: [PATCH 05/11] Fix syntax errors in scala document. --- .../main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala index 4341896bcc2ad..ffdebcde577c9 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.io.Text import org.apache.spark.rdd.RDD /** - * A public object that provide HBase supports. + * A public object that provides HBase support. * You could save RDD into HBase through [[org.apache.spark.nosql.hbase.HBaseUtils.saveAsHBaseTable]] method. */ object HBaseUtils { @@ -29,7 +29,7 @@ object HBaseUtils { /** * Save [[org.apache.spark.rdd.RDD[Text]]] as a HBase table * - * The format of record in RDD should looks like this: + * The format of records in RDD should look like this: * rowkey|delimiter|column|delimiter|column|delimiter|... * For example (if delimiter is ","): * 0001,apple,banana From 4b4c5a799ed83a4cfb60ba08b59a6bef18e0b778 Mon Sep 17 00:00:00 2001 From: haosdent Date: Sun, 23 Mar 2014 01:27:26 +0800 Subject: [PATCH 06/11] Close htable and shutdown mini cluster in test case. --- .../test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala index dc5f46fc79285..d9b9d56a44706 100644 --- a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala +++ b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala @@ -44,5 +44,9 @@ class HBaseSuite extends FunSuite with LocalSparkContext { result = rs.next() i += 1 } + + rs.close() + htable.close() + util.shutdownMiniCluster() } } From 448f667915c5efb594ee71a90efd2f61185bba33 Mon Sep 17 00:00:00 2001 From: haosdent Date: Sun, 23 Mar 2014 01:38:11 +0800 Subject: [PATCH 07/11] Move try/catch block of close() out from SparkHBaseWriter --- .../org/apache/spark/nosql/hbase/HBaseUtils.scala | 10 ++++++++-- .../apache/spark/nosql/hbase/SparkHBaseWriter.scala | 10 ++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala index ffdebcde577c9..a6da7e8dab69c 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala @@ -19,12 +19,14 @@ package org.apache.spark.nosql.hbase import org.apache.hadoop.io.Text import org.apache.spark.rdd.RDD +import org.apache.spark.Logging /** * A public object that provides HBase support. * You could save RDD into HBase through [[org.apache.spark.nosql.hbase.HBaseUtils.saveAsHBaseTable]] method. */ -object HBaseUtils { +object HBaseUtils + extends Logging { /** * Save [[org.apache.spark.rdd.RDD[Text]]] as a HBase table @@ -59,7 +61,11 @@ object HBaseUtils { writer.write(record) } } finally { - writer.close() + try { + writer.close() + } catch { + case ex: Exception => logWarning("Close HBase table failed.", ex) + } } } diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala index 4426b3d4525c3..962412bcfac40 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.nosql.hbase -import org.apache.hadoop.hbase.client.{Put, HTable} +import org.apache.hadoop.hbase.client.{HConnectionManager, Put, HTable} import org.apache.hadoop.io.Text import org.apache.hadoop.hbase.util.Bytes import org.apache.commons.codec.binary.Hex @@ -99,13 +99,7 @@ class SparkHBaseWriter(conf: HBaseConf) def close() { Option(htable) match { - case Some(t) => { - try { - t.close() - } catch { - case ex: Exception => logWarning("Close HBase table failed.", ex) - } - } + case Some(t) => t.close() case None => logWarning("HBase table variable is null!") } } From 67a48a80d0759ab97c916c5139e94ca2b0f7eff4 Mon Sep 17 00:00:00 2001 From: haosdent Date: Sun, 6 Apr 2014 20:47:42 +0800 Subject: [PATCH 08/11] Add methods to save SchemaRDD as HBase table. --- external/hbase/pom.xml | 12 +++ .../apache/spark/nosql/hbase/HBaseUtils.scala | 60 +++++++++++- .../spark/nosql/hbase/SparkHBaseWriter.scala | 92 ++++++++++++------- .../apache/spark/nosql/hbase/HBaseSuite.scala | 67 ++++++++++++-- 4 files changed, 187 insertions(+), 44 deletions(-) diff --git a/external/hbase/pom.xml b/external/hbase/pom.xml index e73eb88c07396..bb57a64526dc4 100644 --- a/external/hbase/pom.xml +++ b/external/hbase/pom.xml @@ -45,6 +45,18 @@ test-jar test + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + org.apache.hbase hbase diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala index a6da7e8dab69c..6604d7c279361 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala @@ -20,6 +20,9 @@ package org.apache.spark.nosql.hbase import org.apache.hadoop.io.Text import org.apache.spark.rdd.RDD import org.apache.spark.Logging +import org.apache.spark.sql.{Row, SchemaRDD} +import org.apache.spark.sql.catalyst.types.DataType +import org.apache.hadoop.hbase.util.Bytes /** * A public object that provides HBase support. @@ -42,12 +45,12 @@ object HBaseUtils * @param zkPort the zookeeper client listening port. e.g. "2181" * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" * @param table the name of table which we save records - * @param rowkeyType the type of rowkey. [[org.apache.spark.nosql.hbase.HBaseType]] + * @param rowkeyType the type of rowkey. [[org.apache.spark.sql.catalyst.types.DataType]] * @param columns the column list. [[org.apache.spark.nosql.hbase.HBaseColumn]] * @param delimiter the delimiter which used to split record into fields */ def saveAsHBaseTable(rdd: RDD[Text], zkHost: String, zkPort: String, zkNode: String, - table: String, rowkeyType: String, columns: List[HBaseColumn], delimiter: Char) { + table: String, rowkeyType: DataType, columns: List[HBaseColumn], delimiter: Char) { val conf = new HBaseConf(zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) def writeToHBase(iter: Iterator[Text]) { @@ -71,4 +74,57 @@ object HBaseUtils rdd.foreachPartition(writeToHBase) } + + /** + * Save [[org.apache.spark.sql.SchemaRDD]] as a HBase table + * + * The first field of Row would be save as rowkey in HBase. + * All fields of Row use the @param family as the column family. + * + * @param rdd [[org.apache.spark.sql.SchemaRDD]] + * @param zkHost the zookeeper hosts. e.g. "10.232.98.10,10.232.98.11,10.232.98.12" + * @param zkPort the zookeeper client listening port. e.g. "2181" + * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" + * @param table the name of table which we save records + * @param family the fixed column family which we save records + */ + def saveAsHBaseTable(rdd: SchemaRDD, + zkHost: String, zkPort: String, zkNode: String, + table: String, family: Array[Byte]) { + // Convert attributes informations in SchemaRDD to List[HBaseColumn] + val attributes = rdd.logicalPlan.output + var i = 0 + // Assume first field in Row is rowkey + val rowkeyType = attributes(i).dataType + + var columns = List.empty[HBaseColumn] + for (i <- 1 to attributes.length - 1) { + val attribute = attributes(i) + val qualifier = Bytes.toBytes(attribute.name) + columns = columns :+ new HBaseColumn(family, qualifier, attribute.dataType) + } + + val conf = new HBaseConf(zkHost, zkPort, zkNode, table, rowkeyType, columns, ',') + + def writeToHBase(iter: Iterator[Row]) { + val writer = new SparkHBaseWriter(conf) + + try { + writer.init() + + while (iter.hasNext) { + val record = iter.next() + writer.write(record) + } + } finally { + try { + writer.close() + } catch { + case ex: Exception => logWarning("Close HBase table failed.", ex) + } + } + } + + rdd.foreachPartition(writeToHBase) + } } diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala index 962412bcfac40..a0d2b5471718f 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.nosql.hbase -import org.apache.hadoop.hbase.client.{HConnectionManager, Put, HTable} +import org.apache.hadoop.hbase.client.{Put, HTable} import org.apache.hadoop.io.Text import org.apache.hadoop.hbase.util.Bytes import org.apache.commons.codec.binary.Hex @@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.conf.Configuration import java.io.IOException import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.types._ +import scala.Some +import org.apache.spark.sql.Row /** * Internal helper class that saves an RDD using a HBase OutputFormat. This is only public @@ -58,19 +61,37 @@ class SparkHBaseWriter(conf: HBaseConf) /** * Convert field to bytes * @param field split by delimiter from record - * @param kind the type of field + * @param dataType the type of field * @return */ - def toByteArray(field: String, kind: String) = kind match { - case HBaseType.Boolean => Bytes.toBytes(field.toBoolean) - case HBaseType.Short => Bytes.toBytes(field.toShort) - case HBaseType.Int => Bytes.toBytes(field.toInt) - case HBaseType.Long => Bytes.toBytes(field.toLong) - case HBaseType.Float => Bytes.toBytes(field.toFloat) - case HBaseType.Double => Bytes.toBytes(field.toDouble) - case HBaseType.String => Bytes.toBytes(field) - case HBaseType.Bytes => Hex.decodeHex(field.toCharArray) - case _ => throw new IOException("Unsupported data type.") + def toByteArray(field: String, dataType: DataType) = dataType match { + case BooleanType => Bytes.toBytes(field.toBoolean) + case ShortType => Bytes.toBytes(field.toShort) + case IntegerType => Bytes.toBytes(field.toInt) + case LongType => Bytes.toBytes(field.toLong) + case FloatType => Bytes.toBytes(field.toFloat) + case DoubleType => Bytes.toBytes(field.toDouble) + case StringType => Bytes.toBytes(field) + case BinaryType => Hex.decodeHex(field.toCharArray) + case _ => throw new IOException("Unsupported data type") + } + + /** + * Convert field to bytes + * @param row from [[org.apache.spark.sql.SchemaRDD]] + * @param index index of field in row + * @param dataType the type of field + * @return + */ + def toByteArray(row: Row, index: Int, dataType: DataType) = dataType match { + case BooleanType => Bytes.toBytes(row.getBoolean(index)) + case ShortType => Bytes.toBytes(row.getShort(index)) + case IntegerType => Bytes.toBytes(row.getInt(index)) + case LongType => Bytes.toBytes(row.getLong(index)) + case FloatType => Bytes.toBytes(row.getFloat(index)) + case DoubleType => Bytes.toBytes(row.getDouble(index)) + case StringType => Bytes.toBytes(row.getString(index)) + case _ => throw new IOException("Unsupported data type") } /** @@ -88,7 +109,29 @@ class SparkHBaseWriter(conf: HBaseConf) List.range(1, fields.size) foreach { i => put.add(columns(i - 1).family, columns(i - 1).qualifier, - toByteArray(fields(i), columns(i - 1).typ)) + toByteArray(fields(i), columns(i - 1).dataType)) + } + + htable.put(put) + } else { + logWarning(s"Record ($record) is unacceptable.") + } + } + + /** + * Convert a [[org.apache.spark.sql.Row]] record to [[org.apache.hadoop.hbase.client.Put]] + * and put it to HBase + * @param record + * @return + */ + def write(record: Row) = { + // Check the format of record + if (record.length == columns.length + 1) { + val put = new Put(toByteArray(record, 0, rowkeyType)) + + List.range(1, record.length) foreach { + i => put.add(columns(i - 1).family, columns(i - 1).qualifier, + toByteArray(record, i, columns(i - 1).dataType)) } htable.put(put) @@ -105,28 +148,13 @@ class SparkHBaseWriter(conf: HBaseConf) } } -/** - * Contains the types which supported to - * parse from [[java.lang.String]] into [[scala.Array[Byte]]] - */ -object HBaseType { - val Boolean = "bool" - val Short = "short" - val Int = "int" - val Long = "long" - val Float = "float" - val Double = "double" - val String = "string" - val Bytes = "bytes" -} - /** * A representation of HBase Column * @param family HBase Column Family * @param qualifier HBase Column Qualifier - * @param typ type [[org.apache.spark.nosql.hbase.HBaseType]] + * @param dataType type [[org.apache.spark.sql.catalyst.types.DataType]] */ -class HBaseColumn(val family: Array[Byte], val qualifier: Array[Byte], val typ: String) +class HBaseColumn(val family: Array[Byte], val qualifier: Array[Byte], val dataType: DataType) extends Serializable /** @@ -136,11 +164,11 @@ class HBaseColumn(val family: Array[Byte], val qualifier: Array[Byte], val typ: * @param zkPort the zookeeper client listening port. e.g. "2181" * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" * @param table the name of table which we save records - * @param rowkeyType the type of rowkey. [[org.apache.spark.nosql.hbase.HBaseType]] + * @param rowkeyType the type of rowkey. [[org.apache.spark.sql.catalyst.types.DataType]] * @param columns the column list. [[org.apache.spark.nosql.hbase.HBaseColumn]] * @param delimiter the delimiter which used to split record into fields */ class HBaseConf(val zkHost: String, val zkPort: String, val zkNode: String, - val table: String, val rowkeyType: String, val columns: List[HBaseColumn], + val table: String, val rowkeyType: DataType, val columns: List[HBaseColumn], val delimiter: Char) extends Serializable \ No newline at end of file diff --git a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala index d9b9d56a44706..c83fb6bfb1f1b 100644 --- a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala +++ b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala @@ -1,27 +1,41 @@ package org.apache.spark.nosql.hbase -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.apache.hadoop.io.Text import org.apache.spark.{SparkContext, LocalSparkContext} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility} import org.apache.hadoop.hbase.client.{Scan, HTable} +import org.apache.spark.sql.catalyst.types.{FloatType, StringType} +import org.apache.spark.sql.{TestData, SchemaRDD} +import org.apache.spark.sql.test.TestSQLContext._ -class HBaseSuite extends FunSuite with LocalSparkContext { +class HBaseSuite + extends FunSuite + with LocalSparkContext + with BeforeAndAfterAll { - test("write SequenceFile using HBase") { - sc = new SparkContext("local", "test") + val util = new HBaseTestingUtility() + + override def beforeAll() { + util.startMiniCluster() + } + + override def afterAll() { + util.shutdownMiniCluster() + } + + test("save SequenceFile as HBase table") { + sc = new SparkContext("local", "test1") val nums = sc.makeRDD(1 to 3).map(x => new Text("a" + x + " 1.0")) - val table = "test" - val rowkeyType = HBaseType.String + val table = "test1" + val rowkeyType = StringType val cfBytes = Bytes.toBytes("cf") val qualBytes = Bytes.toBytes("qual0") - val columns = List[HBaseColumn](new HBaseColumn(cfBytes, qualBytes, HBaseType.Float)) + val columns = List[HBaseColumn](new HBaseColumn(cfBytes, qualBytes, FloatType)) val delimiter = ' ' - val util = new HBaseTestingUtility() - util.startMiniCluster() util.createTable(Bytes.toBytes(table), cfBytes) val conf = util.getConfiguration val zkHost = conf.get(HConstants.ZOOKEEPER_QUORUM) @@ -30,6 +44,7 @@ class HBaseSuite extends FunSuite with LocalSparkContext { HBaseUtils.saveAsHBaseTable(nums, zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) + // Verify results val htable = new HTable(conf, table) val scan = new Scan() val rs = htable.getScanner(scan) @@ -47,6 +62,38 @@ class HBaseSuite extends FunSuite with LocalSparkContext { rs.close() htable.close() - util.shutdownMiniCluster() + } + + test("save SchemaRDD as HBase table") { + val table = "test2" + val cfBytes = Bytes.toBytes("cf") + + util.createTable(Bytes.toBytes(table), cfBytes) + val conf = util.getConfiguration + val zkHost = conf.get(HConstants.ZOOKEEPER_QUORUM) + val zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + val zkNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT) + + HBaseUtils.saveAsHBaseTable(TestData.lowerCaseData, zkHost, zkPort, zkNode, table, cfBytes) + + // Verify results + val htable = new HTable(conf, table) + val scan = new Scan() + val rs = htable.getScanner(scan) + val qualBytes = Bytes.toBytes("l") + + var result = rs.next() + var i = 1 + while (result != null) { + val rowkey = Bytes.toInt(result.getRow) + assert(rowkey == i) + val value = Bytes.toString(result.getValue(cfBytes, qualBytes)) + assert(('a' + i - 1).toChar + "" == value) + result = rs.next() + i += 1 + } + + rs.close() + htable.close() } } From 177958adf572a869515c53c5971f8ff850148e2b Mon Sep 17 00:00:00 2001 From: haosdent Date: Mon, 7 Apr 2014 01:15:40 +0800 Subject: [PATCH 09/11] Add deps in sbt. --- project/SparkBuild.scala | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 843a874fbfdb0..fcb3f983bcd06 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -137,8 +137,11 @@ object SparkBuild extends Build { lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) .dependsOn(streaming % "compile->compile;test->test") - lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) - lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) + lazy val externalHBase = Project("external-hbase", file("external/hbase"), settings = hbaseSettings) + .dependsOn(core % "compile->compile;test->test", sql % "compile->compile;test->test") + + lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt, externalHBase) + lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt, externalHBase) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter, hive) dependsOn(allExternal: _*) @@ -558,4 +561,15 @@ object SparkBuild extends Build { previousArtifact := sparkPreviousArtifact("spark-streaming-mqtt"), libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") ) + + def hbaseSettings() = sharedSettings ++ Seq( + name := "spark-nosql-hbase", + previousArtifact := sparkPreviousArtifact("spark-nosql-hbase"), + libraryDependencies ++= Seq( + "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging), + "org.apache.hbase" % "hbase" % HBASE_VERSION % "test" classifier "tests" excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging), + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm), + "org.apache.hadoop" % "hadoop-test" % hadoopVersion % "test" excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging) + ) + ) } From a09a6c23d5df4292b18cde4954e0f38f7e8b66c0 Mon Sep 17 00:00:00 2001 From: haosdent Date: Mon, 7 Apr 2014 17:04:57 +0800 Subject: [PATCH 10/11] Fix scalastyle errors. --- .../scala/org/apache/spark/nosql/hbase/HBaseUtils.scala | 8 +++++--- .../org/apache/spark/nosql/hbase/SparkHBaseWriter.scala | 3 ++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala index 6604d7c279361..275c4205abe40 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala @@ -26,7 +26,8 @@ import org.apache.hadoop.hbase.util.Bytes /** * A public object that provides HBase support. - * You could save RDD into HBase through [[org.apache.spark.nosql.hbase.HBaseUtils.saveAsHBaseTable]] method. + * You could save RDD into HBase through + * [[org.apache.spark.nosql.hbase.HBaseUtils.saveAsHBaseTable]] method. */ object HBaseUtils extends Logging { @@ -49,8 +50,9 @@ object HBaseUtils * @param columns the column list. [[org.apache.spark.nosql.hbase.HBaseColumn]] * @param delimiter the delimiter which used to split record into fields */ - def saveAsHBaseTable(rdd: RDD[Text], zkHost: String, zkPort: String, zkNode: String, - table: String, rowkeyType: DataType, columns: List[HBaseColumn], delimiter: Char) { + def saveAsHBaseTable(rdd: RDD[Text], + zkHost: String, zkPort: String, zkNode: String, table: String, + rowkeyType: DataType, columns: List[HBaseColumn], delimiter: Char) { val conf = new HBaseConf(zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) def writeToHBase(iter: Iterator[Text]) { diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala index a0d2b5471718f..145752c10d94d 100644 --- a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala @@ -171,4 +171,5 @@ class HBaseColumn(val family: Array[Byte], val qualifier: Array[Byte], val dataT class HBaseConf(val zkHost: String, val zkPort: String, val zkNode: String, val table: String, val rowkeyType: DataType, val columns: List[HBaseColumn], val delimiter: Char) - extends Serializable \ No newline at end of file + extends Serializable + From 30f7343018d8d3bf8e8f1511a56511d35aaa025f Mon Sep 17 00:00:00 2001 From: haosdent Date: Mon, 7 Apr 2014 18:59:34 +0800 Subject: [PATCH 11/11] Remove unecessary imports. --- .../test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala index c83fb6bfb1f1b..7b249de12b657 100644 --- a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala +++ b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala @@ -7,7 +7,7 @@ import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility} import org.apache.hadoop.hbase.client.{Scan, HTable} import org.apache.spark.sql.catalyst.types.{FloatType, StringType} -import org.apache.spark.sql.{TestData, SchemaRDD} +import org.apache.spark.sql.TestData import org.apache.spark.sql.test.TestSQLContext._ class HBaseSuite