diff --git a/core/pom.xml b/core/pom.xml index 4d7d41a9714d7..9432f9df82937 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -36,6 +36,21 @@ org.apache.hadoop hadoop-client + + org.apache.hbase + hbase + test-jar + test + + + org.apache.hbase + hbase + + + org.apache.hadoop + hadoop-test + test + net.java.dev.jets3t jets3t diff --git a/core/src/main/scala/org/apache/spark/SparkHBaseWriter.scala b/core/src/main/scala/org/apache/spark/SparkHBaseWriter.scala new file mode 100644 index 0000000000000..2a9ae79c71823 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SparkHBaseWriter.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.hadoop.hbase.client.{Put, HTable} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.util.Bytes +import org.apache.commons.codec.binary.Hex +import org.apache.hadoop.io.Text +import org.apache.hadoop.hbase.HConstants + +/** + * Internal helper class that saves an RDD using a HBase OutputFormat. This is only public + * because we need to access this class from the `spark` package to use some package-private HBase + * functions, but this class should not be used directly by users. + * + * Saves the RDD using a JobConf, which should contain an output key class, an output value class, + * a filename to write to, etc, exactly like in a HBase MapReduce job. + */ +private[apache] +class SparkHBaseWriter(conf: HBaseConf) + extends Logging { + + private var htable: HTable = null + + val zkHost = conf.zkHost + val zkPort = conf.zkPort + val zkNode = conf.zkNode + val table = conf.table + val rowkeyType = conf.rowkeyType + val columns = conf.columns + val delimiter = conf.delimiter + + def setup() { + val conf = new Configuration() + conf.set(HConstants.ZOOKEEPER_QUORUM, zkHost) + conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort) + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkNode) + htable = new HTable(conf, table) + } + + def toByteArr(field: String, kind: String) = kind match { + case HBaseType.Boolean => Bytes.toBytes(field.toBoolean) + case HBaseType.Int => Bytes.toBytes(field.toInt) + case HBaseType.Long => Bytes.toBytes(field.toLong) + case HBaseType.Float => Bytes.toBytes(field.toFloat) + case HBaseType.Double => Bytes.toBytes(field.toDouble) + case HBaseType.String => Bytes.toBytes(field) + case HBaseType.Bytes => Hex.decodeHex(field.toCharArray) + } + + def parseRecord(record: String) = { + val fields = record.split(delimiter) + val put = new Put(toByteArr(fields(0), rowkeyType)) + + List.range(1, fields.size) foreach { + i => put.add(columns(i - 1).family, columns(i - 1).qualifier, toByteArr(fields(i), columns(i - 1).typ)) + } + + put + } + + def write(record: Text) { + val put = parseRecord(record.toString) + htable.put(put) + } + + def close() { + htable.close() + } +} + +private[apache] +object HBaseType { + val Boolean = "bool" + val Int = "int" + val Long = "long" + val Float = "float" + val Double = "double" + val String = "string" + val Bytes = "bytes" +} + +private[apache] +class HBaseColumn(val family: Array[Byte], val qualifier: Array[Byte], val typ: String) + extends Serializable + +private[apache] +class HBaseConf(val zkHost: String, val zkPort: String, val zkNode: String, + val table: String, val rowkeyType: String, val columns: List[HBaseColumn], val delimiter: Char) + extends Serializable \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 2384c8f2b6fd4..02b3f51a37652 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -36,6 +36,7 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} +import org.apache.hadoop.io.Text // SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. import org.apache.hadoop.mapred.SparkHadoopWriter @@ -746,6 +747,23 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) writer.commitJob() } + def saveAsHBaseTable(zkHost: String, zkPort: String, zkNode: String, + table: String, rowkeyType: String, columns: List[HBaseColumn], delimiter: Char) { + val conf = new HBaseConf(zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) + + def writeToHBase(context: TaskContext, iter: Iterator[(K, V)]) { + val writer = new SparkHBaseWriter(conf) + writer.setup() + + while(iter.hasNext) { + val record = iter.next() + writer.write(record._2.asInstanceOf[Text]) + } + } + + self.context.runJob(self, writeToHBase _) + } + /** * Return an RDD with the keys of each tuple. */ diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 76173608e9f70..d735284109194 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.FileAlreadyExistsException import org.scalatest.FunSuite import org.apache.spark.SparkContext._ +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.{HConstants, HBaseTestingUtility} import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat class FileSuite extends FunSuite with LocalSparkContext { @@ -197,6 +199,27 @@ class FileSuite extends FunSuite with LocalSparkContext { assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) } + test("write SequenceFile using HBase") { + sc = new SparkContext("local", "test") + val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" + x + " 1.0"))) + + val table = "test" + val rowkeyType = HBaseType.String + val cfBytes = Bytes.toBytes("cf") + val columns = List[HBaseColumn](new HBaseColumn(cfBytes, Bytes.toBytes("qual0"), HBaseType.Float)) + val delimiter = ' ' + + val util = new HBaseTestingUtility() + util.startMiniCluster() + util.createTable(Bytes.toBytes(table), cfBytes) + val conf = util.getConfiguration + val zkHost = conf.get(HConstants.ZOOKEEPER_QUORUM) + val zkPort = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + val zkNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT) + + nums.saveAsHBaseTable(zkHost, zkPort, zkNode, table, rowkeyType, columns, delimiter) + } + test("file caching") { sc = new SparkContext("local", "test") val tempDir = Files.createTempDir() diff --git a/pom.xml b/pom.xml index f0c877dcfe7b2..bef5079d5811b 100644 --- a/pom.xml +++ b/pom.xml @@ -464,6 +464,24 @@ + + org.apache.hbase + hbase + ${hbase.version} + test-jar + test + + + org.apache.hbase + hbase + ${hbase.version} + + + org.apache.hadoop + hadoop-test + ${hadoop.version} + test + org.apache.avro avro diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 8fa220c413291..540a131ce6af5 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -91,6 +91,8 @@ object SparkBuild extends Build { lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client" val maybeAvro = if (hadoopVersion.startsWith("0.23.") && isYarnEnabled) Seq("org.apache.avro" % "avro" % "1.7.4") else Seq() + lazy val hbaseVersion = Properties.envOrElse("SPARK_HBASE_VERSION", HBASE_VERSION) + // Conditionally include the java 8 sub-project lazy val javaVersion = System.getProperty("java.specification.version") lazy val isJava8Enabled = javaVersion.toDouble >= "1.8".toDouble @@ -292,6 +294,9 @@ object SparkBuild extends Build { "net.java.dev.jets3t" % "jets3t" % "0.7.1" excludeAll(excludeCommonsLogging), "org.apache.derby" % "derby" % "10.4.2.0" % "test", "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm), + "org.apache.hadoop" % "hadoop-test" % hadoopVersion % "test", + "org.apache.hbase" % "hbase" % hbaseVersion, + "org.apache.hbase" % "hbase" % hbaseVersion % "test" classifier "tests", "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0",