diff --git a/external/hbase/pom.xml b/external/hbase/pom.xml new file mode 100644 index 0000000000000..bb57a64526dc4 --- /dev/null +++ b/external/hbase/pom.xml @@ -0,0 +1,104 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.0.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-nosql-hbase + jar + Spark Project External HBase + http://spark.apache.org/ + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.hbase + hbase + ${hbase.version} + + + org.apache.hbase + hbase + ${hbase.version} + test-jar + test + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.hadoop + hadoop-test + ${hadoop.version} + test + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + + + diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala new file mode 100644 index 0000000000000..275c4205abe40 --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/HBaseUtils.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.nosql.hbase + +import org.apache.hadoop.io.Text +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging +import org.apache.spark.sql.{Row, SchemaRDD} +import org.apache.spark.sql.catalyst.types.DataType +import org.apache.hadoop.hbase.util.Bytes + +/** + * A public object that provides HBase support. + * You could save RDD into HBase through + * [[org.apache.spark.nosql.hbase.HBaseUtils.saveAsHBaseTable]] method. + */ +object HBaseUtils + extends Logging { + + /** + * Save [[org.apache.spark.rdd.RDD[Text]]] as a HBase table + * + * The format of records in RDD should look like this: + * rowkey|delimiter|column|delimiter|column|delimiter|... + * For example (if delimiter is ","): + * 0001,apple,banana + * "0001" is rowkey field while "apple" and "banana" are column fields. + * + * @param rdd [[org.apache.spark.rdd.RDD[Text]]] + * @param zkHost the zookeeper hosts. e.g. "10.232.98.10,10.232.98.11,10.232.98.12" + * @param zkPort the zookeeper client listening port. e.g. "2181" + * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" + * @param table the name of table which we save records + * @param rowkeyType the type of rowkey. [[org.apache.spark.sql.catalyst.types.DataType]] + * @param columns the column list. [[org.apache.spark.nosql.hbase.HBaseColumn]] + * @param delimiter the delimiter which used to split record into fields + */ + def saveAsHBaseTable(rdd: RDD[Text], + zkHost: String, zkPort: String, zkNode: String, table: String, + rowkeyType: DataType, columns: List[HBaseColumn], delimiter: Char) { + val conf = new HBaseConf(zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) + + def writeToHBase(iter: Iterator[Text]) { + val writer = new SparkHBaseWriter(conf) + + try { + writer.init() + + while (iter.hasNext) { + val record = iter.next() + writer.write(record) + } + } finally { + try { + writer.close() + } catch { + case ex: Exception => logWarning("Close HBase table failed.", ex) + } + } + } + + rdd.foreachPartition(writeToHBase) + } + + /** + * Save [[org.apache.spark.sql.SchemaRDD]] as a HBase table + * + * The first field of Row would be save as rowkey in HBase. + * All fields of Row use the @param family as the column family. + * + * @param rdd [[org.apache.spark.sql.SchemaRDD]] + * @param zkHost the zookeeper hosts. e.g. "10.232.98.10,10.232.98.11,10.232.98.12" + * @param zkPort the zookeeper client listening port. e.g. "2181" + * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" + * @param table the name of table which we save records + * @param family the fixed column family which we save records + */ + def saveAsHBaseTable(rdd: SchemaRDD, + zkHost: String, zkPort: String, zkNode: String, + table: String, family: Array[Byte]) { + // Convert attributes informations in SchemaRDD to List[HBaseColumn] + val attributes = rdd.logicalPlan.output + var i = 0 + // Assume first field in Row is rowkey + val rowkeyType = attributes(i).dataType + + var columns = List.empty[HBaseColumn] + for (i <- 1 to attributes.length - 1) { + val attribute = attributes(i) + val qualifier = Bytes.toBytes(attribute.name) + columns = columns :+ new HBaseColumn(family, qualifier, attribute.dataType) + } + + val conf = new HBaseConf(zkHost, zkPort, zkNode, table, rowkeyType, columns, ',') + + def writeToHBase(iter: Iterator[Row]) { + val writer = new SparkHBaseWriter(conf) + + try { + writer.init() + + while (iter.hasNext) { + val record = iter.next() + writer.write(record) + } + } finally { + try { + writer.close() + } catch { + case ex: Exception => logWarning("Close HBase table failed.", ex) + } + } + } + + rdd.foreachPartition(writeToHBase) + } +} diff --git a/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala new file mode 100644 index 0000000000000..145752c10d94d --- /dev/null +++ b/external/hbase/src/main/scala/org/apache/spark/nosql/hbase/SparkHBaseWriter.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.nosql.hbase + +import org.apache.hadoop.hbase.client.{Put, HTable} +import org.apache.hadoop.io.Text +import org.apache.hadoop.hbase.util.Bytes +import org.apache.commons.codec.binary.Hex +import org.apache.hadoop.hbase.HConstants +import org.apache.hadoop.conf.Configuration +import java.io.IOException +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.types._ +import scala.Some +import org.apache.spark.sql.Row + +/** + * Internal helper class that saves an RDD using a HBase OutputFormat. This is only public + * because we need to access this class from the `hbase` package to use some package-private HBase + * functions, but this class should not be used directly by users. + */ +private[hbase] +class SparkHBaseWriter(conf: HBaseConf) + extends Logging { + + private var htable: HTable = null + + val zkHost = conf.zkHost + val zkPort = conf.zkPort + val zkNode = conf.zkNode + val table = conf.table + val rowkeyType = conf.rowkeyType + val columns = conf.columns + val delimiter = conf.delimiter + + def init() { + val conf = new Configuration() + conf.set(HConstants.ZOOKEEPER_QUORUM, zkHost) + conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort) + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkNode) + htable = new HTable(conf, table) + // Use default writebuffersize to submit batch puts + htable.setAutoFlush(false) + } + + /** + * Convert field to bytes + * @param field split by delimiter from record + * @param dataType the type of field + * @return + */ + def toByteArray(field: String, dataType: DataType) = dataType match { + case BooleanType => Bytes.toBytes(field.toBoolean) + case ShortType => Bytes.toBytes(field.toShort) + case IntegerType => Bytes.toBytes(field.toInt) + case LongType => Bytes.toBytes(field.toLong) + case FloatType => Bytes.toBytes(field.toFloat) + case DoubleType => Bytes.toBytes(field.toDouble) + case StringType => Bytes.toBytes(field) + case BinaryType => Hex.decodeHex(field.toCharArray) + case _ => throw new IOException("Unsupported data type") + } + + /** + * Convert field to bytes + * @param row from [[org.apache.spark.sql.SchemaRDD]] + * @param index index of field in row + * @param dataType the type of field + * @return + */ + def toByteArray(row: Row, index: Int, dataType: DataType) = dataType match { + case BooleanType => Bytes.toBytes(row.getBoolean(index)) + case ShortType => Bytes.toBytes(row.getShort(index)) + case IntegerType => Bytes.toBytes(row.getInt(index)) + case LongType => Bytes.toBytes(row.getLong(index)) + case FloatType => Bytes.toBytes(row.getFloat(index)) + case DoubleType => Bytes.toBytes(row.getDouble(index)) + case StringType => Bytes.toBytes(row.getString(index)) + case _ => throw new IOException("Unsupported data type") + } + + /** + * Convert a [[org.apache.hadoop.io.Text]] record to [[org.apache.hadoop.hbase.client.Put]] + * and put it to HBase + * @param record + * @return + */ + def write(record: Text) = { + val fields = record.toString.split(delimiter) + + // Check the format of record + if (fields.size == columns.length + 1) { + val put = new Put(toByteArray(fields(0), rowkeyType)) + + List.range(1, fields.size) foreach { + i => put.add(columns(i - 1).family, columns(i - 1).qualifier, + toByteArray(fields(i), columns(i - 1).dataType)) + } + + htable.put(put) + } else { + logWarning(s"Record ($record) is unacceptable.") + } + } + + /** + * Convert a [[org.apache.spark.sql.Row]] record to [[org.apache.hadoop.hbase.client.Put]] + * and put it to HBase + * @param record + * @return + */ + def write(record: Row) = { + // Check the format of record + if (record.length == columns.length + 1) { + val put = new Put(toByteArray(record, 0, rowkeyType)) + + List.range(1, record.length) foreach { + i => put.add(columns(i - 1).family, columns(i - 1).qualifier, + toByteArray(record, i, columns(i - 1).dataType)) + } + + htable.put(put) + } else { + logWarning(s"Record ($record) is unacceptable.") + } + } + + def close() { + Option(htable) match { + case Some(t) => t.close() + case None => logWarning("HBase table variable is null!") + } + } +} + +/** + * A representation of HBase Column + * @param family HBase Column Family + * @param qualifier HBase Column Qualifier + * @param dataType type [[org.apache.spark.sql.catalyst.types.DataType]] + */ +class HBaseColumn(val family: Array[Byte], val qualifier: Array[Byte], val dataType: DataType) + extends Serializable + +/** + * A representation of HBase Configuration. + * It contains important parameters which used to create connection to HBase Servers + * @param zkHost the zookeeper hosts. e.g. "10.232.98.10,10.232.98.11,10.232.98.12" + * @param zkPort the zookeeper client listening port. e.g. "2181" + * @param zkNode the zookeeper znode of HBase. e.g. "hbase-apache" + * @param table the name of table which we save records + * @param rowkeyType the type of rowkey. [[org.apache.spark.sql.catalyst.types.DataType]] + * @param columns the column list. [[org.apache.spark.nosql.hbase.HBaseColumn]] + * @param delimiter the delimiter which used to split record into fields + */ +class HBaseConf(val zkHost: String, val zkPort: String, val zkNode: String, + val table: String, val rowkeyType: DataType, val columns: List[HBaseColumn], + val delimiter: Char) + extends Serializable + diff --git a/external/hbase/src/test/resources/log4j.properties b/external/hbase/src/test/resources/log4j.properties new file mode 100644 index 0000000000000..ea56b9e41beb2 --- /dev/null +++ b/external/hbase/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file external/hbase/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=external/hbase/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala new file mode 100644 index 0000000000000..7b249de12b657 --- /dev/null +++ b/external/hbase/src/test/scala/org/apache/spark/nosql/hbase/HBaseSuite.scala @@ -0,0 +1,99 @@ +package org.apache.spark.nosql.hbase + +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.apache.hadoop.io.Text +import org.apache.spark.{SparkContext, LocalSparkContext} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility} +import org.apache.hadoop.hbase.client.{Scan, HTable} +import org.apache.spark.sql.catalyst.types.{FloatType, StringType} +import org.apache.spark.sql.TestData +import org.apache.spark.sql.test.TestSQLContext._ + +class HBaseSuite + extends FunSuite + with LocalSparkContext + with BeforeAndAfterAll { + + val util = new HBaseTestingUtility() + + override def beforeAll() { + util.startMiniCluster() + } + + override def afterAll() { + util.shutdownMiniCluster() + } + + test("save SequenceFile as HBase table") { + sc = new SparkContext("local", "test1") + val nums = sc.makeRDD(1 to 3).map(x => new Text("a" + x + " 1.0")) + + val table = "test1" + val rowkeyType = StringType + val cfBytes = Bytes.toBytes("cf") + val qualBytes = Bytes.toBytes("qual0") + val columns = List[HBaseColumn](new HBaseColumn(cfBytes, qualBytes, FloatType)) + val delimiter = ' ' + + util.createTable(Bytes.toBytes(table), cfBytes) + val conf = util.getConfiguration + val zkHost = conf.get(HConstants.ZOOKEEPER_QUORUM) + val zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + val zkNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT) + + HBaseUtils.saveAsHBaseTable(nums, zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) + + // Verify results + val htable = new HTable(conf, table) + val scan = new Scan() + val rs = htable.getScanner(scan) + + var result = rs.next() + var i = 1 + while (result != null) { + val rowkey = Bytes.toString(result.getRow) + assert(rowkey == "a" + i) + val value = Bytes.toFloat(result.getValue(cfBytes, qualBytes)) + assert(value == 1.0) + result = rs.next() + i += 1 + } + + rs.close() + htable.close() + } + + test("save SchemaRDD as HBase table") { + val table = "test2" + val cfBytes = Bytes.toBytes("cf") + + util.createTable(Bytes.toBytes(table), cfBytes) + val conf = util.getConfiguration + val zkHost = conf.get(HConstants.ZOOKEEPER_QUORUM) + val zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + val zkNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT) + + HBaseUtils.saveAsHBaseTable(TestData.lowerCaseData, zkHost, zkPort, zkNode, table, cfBytes) + + // Verify results + val htable = new HTable(conf, table) + val scan = new Scan() + val rs = htable.getScanner(scan) + val qualBytes = Bytes.toBytes("l") + + var result = rs.next() + var i = 1 + while (result != null) { + val rowkey = Bytes.toInt(result.getRow) + assert(rowkey == i) + val value = Bytes.toString(result.getValue(cfBytes, qualBytes)) + assert(('a' + i - 1).toChar + "" == value) + result = rs.next() + i += 1 + } + + rs.close() + htable.close() + } +} diff --git a/pom.xml b/pom.xml index 01341d21b7f23..a5fb8644c80c4 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,7 @@ external/flume external/zeromq external/mqtt + external/hbase examples diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 843a874fbfdb0..fcb3f983bcd06 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -137,8 +137,11 @@ object SparkBuild extends Build { lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings) .dependsOn(streaming % "compile->compile;test->test") - lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) - lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt) + lazy val externalHBase = Project("external-hbase", file("external/hbase"), settings = hbaseSettings) + .dependsOn(core % "compile->compile;test->test", sql % "compile->compile;test->test") + + lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt, externalHBase) + lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt, externalHBase) lazy val examples = Project("examples", file("examples"), settings = examplesSettings) .dependsOn(core, mllib, graphx, bagel, streaming, externalTwitter, hive) dependsOn(allExternal: _*) @@ -558,4 +561,15 @@ object SparkBuild extends Build { previousArtifact := sparkPreviousArtifact("spark-streaming-mqtt"), libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0") ) + + def hbaseSettings() = sharedSettings ++ Seq( + name := "spark-nosql-hbase", + previousArtifact := sparkPreviousArtifact("spark-nosql-hbase"), + libraryDependencies ++= Seq( + "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging), + "org.apache.hbase" % "hbase" % HBASE_VERSION % "test" classifier "tests" excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging), + "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm), + "org.apache.hadoop" % "hadoop-test" % hadoopVersion % "test" excludeAll(excludeNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging) + ) + ) }