diff --git a/sql/hbase/src/main/java/org/apache/spark/sql/hbase/SparkImmutableBytesWritable.java b/sql/hbase/src/main/java/org/apache/spark/sql/hbase/SparkImmutableBytesWritable.java new file mode 100644 index 0000000000000..59691e6f7fadf --- /dev/null +++ b/sql/hbase/src/main/java/org/apache/spark/sql/hbase/SparkImmutableBytesWritable.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase; + +import java.io.IOException; +import java.io.DataInput; +import java.io.DataOutput; +import java.util.Arrays; +import java.util.List; +import scala.Serializable; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +/** + * copy from hbase for Serializable issue + */ +public class SparkImmutableBytesWritable + implements WritableComparable, Serializable { + private byte[] bytes; + private int offset; + private int length; + + /** + * Create a zero-size sequence. + */ + public SparkImmutableBytesWritable() { + super(); + } + + /** + * Create a ImmutableBytesWritable using the byte array as the initial value. + * @param bytes This array becomes the backing storage for the object. + */ + public SparkImmutableBytesWritable(byte[] bytes) { + this(bytes, 0, bytes.length); + } + + /** + * Set the new ImmutableBytesWritable to the contents of the passed + * ibw. + * @param ibw the value to set this ImmutableBytesWritable to. + */ + public SparkImmutableBytesWritable(final SparkImmutableBytesWritable ibw) { + this(ibw.get(), ibw.getOffset(), ibw.getLength()); + } + + /** + * Set the value to a given byte range + * @param bytes the new byte range to set to + * @param offset the offset in newData to start at + * @param length the number of bytes in the range + */ + public SparkImmutableBytesWritable(final byte[] bytes, final int offset, + final int length) { + this.bytes = bytes; + this.offset = offset; + this.length = length; + } + + /** + * Get the data from the BytesWritable. + * @return The data is only valid between offset and offset+length. + */ + public byte [] get() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.bytes; + } + + /** + * @param b Use passed bytes as backing array for this instance. + */ + public void set(final byte [] b) { + set(b, 0, b.length); + } + + /** + * @param b Use passed bytes as backing array for this instance. + * @param offset + * @param length + */ + public void set(final byte [] b, final int offset, final int length) { + this.bytes = b; + this.offset = offset; + this.length = length; + } + + /** + * @return the number of valid bytes in the buffer + * @deprecated use {@link #getLength()} instead + */ + @Deprecated + public int getSize() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.length; + } + + /** + * @return the number of valid bytes in the buffer + */ + public int getLength() { + if (this.bytes == null) { + throw new IllegalStateException("Uninitialiized. Null constructor " + + "called w/o accompaying readFields invocation"); + } + return this.length; + } + + /** + * @return offset + */ + public int getOffset(){ + return this.offset; + } + + public void readFields(final DataInput in) throws IOException { + this.length = in.readInt(); + this.bytes = new byte[this.length]; + in.readFully(this.bytes, 0, this.length); + this.offset = 0; + } + + public void write(final DataOutput out) throws IOException { + out.writeInt(this.length); + out.write(this.bytes, this.offset, this.length); + } + + // Below methods copied from BytesWritable + @Override + public int hashCode() { + int hash = 1; + for (int i = offset; i < offset + length; i++) + hash = (31 * hash) + (int)bytes[i]; + return hash; + } + + /** + * Define the sort order of the BytesWritable. + * @param that The other bytes writable + * @return Positive if left is bigger than right, 0 if they are equal, and + * negative if left is smaller than right. + */ + public int compareTo(SparkImmutableBytesWritable that) { + return WritableComparator.compareBytes( + this.bytes, this.offset, this.length, + that.bytes, that.offset, that.length); + } + + /** + * Compares the bytes in this object to the specified byte array + * @param that + * @return Positive if left is bigger than right, 0 if they are equal, and + * negative if left is smaller than right. + */ + public int compareTo(final byte [] that) { + return WritableComparator.compareBytes( + this.bytes, this.offset, this.length, + that, 0, that.length); + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object right_obj) { + if (right_obj instanceof byte []) { + return compareTo((byte [])right_obj) == 0; + } + if (right_obj instanceof SparkImmutableBytesWritable) { + return compareTo((SparkImmutableBytesWritable)right_obj) == 0; + } + return false; + } + + /** + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(3*this.length); + final int endIdx = this.offset + this.length; + for (int idx = this.offset; idx < endIdx ; idx++) { + sb.append(' '); + String num = Integer.toHexString(0xff & this.bytes[idx]); + // if it is only one digit, add a leading 0. + if (num.length() < 2) { + sb.append('0'); + } + sb.append(num); + } + return sb.length() > 0 ? sb.substring(1) : ""; + } + + /** A Comparator optimized for ImmutableBytesWritable. + */ + public static class Comparator extends WritableComparator { + private BytesWritable.Comparator comparator = + new BytesWritable.Comparator(); + + /** constructor */ + public Comparator() { + super(SparkImmutableBytesWritable.class); + } + + /** + * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int) + */ + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return comparator.compare(b1, s1, l1, b2, s2, l2); + } + } + + static { // register this comparator + WritableComparator.define(SparkImmutableBytesWritable.class, new Comparator()); + } + + /** + * @param array List of byte []. + * @return Array of byte []. + */ + public static byte [][] toArray(final List array) { + // List#toArray doesn't work on lists of byte []. + byte[][] results = new byte[array.size()][]; + for (int i = 0; i < array.size(); i++) { + results[i] = array.get(i); + } + return results; + } + + /** + * Returns a copy of the bytes referred to by this writable + */ + public byte[] copyBytes() { + return Arrays.copyOfRange(bytes, offset, offset+length); + } +} diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala new file mode 100644 index 0000000000000..1a92f321000a2 --- /dev/null +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBasePartitioner.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} +import scala.Array +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkEnv +import org.apache.spark.Partitioner +import org.apache.spark.util.{Utils, CollectionsUtils} +import org.apache.spark.serializer.JavaSerializer +import org.apache.hadoop.hbase.client.HTable + +class HBasePartitioner [K : Ordering : ClassTag, V]( + @transient rdd: RDD[_ <: Product2[K,V]])(splitKeys: Array[K]) + extends Partitioner { + + private var ordering = implicitly[Ordering[K]] + + private var rangeBounds: Array[K] = splitKeys + + def numPartitions = rangeBounds.length + 1 + + private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] + + // todo: test this + def getPartition(key: Any): Int = { + val k = key.asInstanceOf[K] + var partition = 0 + if (rangeBounds.length <= 128) { + // If we have less than 128 partitions naive search + while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { + partition += 1 + } + } else { + // Determine which binary search method to use only once. + partition = binarySearch(rangeBounds, k) + // binarySearch either returns the match location or -[insertion point]-1 + if (partition < 0) { + partition = -partition-1 + } + if (partition > rangeBounds.length) { + partition = rangeBounds.length + } + } + partition + } + + override def equals(other: Any): Boolean = other match { + case r: HBasePartitioner[_,_] => + r.rangeBounds.sameElements(rangeBounds) + case _ => + false + } + + override def hashCode(): Int = { + val prime = 31 + var result = 1 + var i = 0 + while (i < rangeBounds.length) { + result = prime * result + rangeBounds(i).hashCode + i += 1 + } + result = prime * result + result + } + + @throws(classOf[IOException]) + private def writeObject(out: ObjectOutputStream) { + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => out.defaultWriteObject() + case _ => + out.writeObject(ordering) + out.writeObject(binarySearch) + + val ser = sfactory.newInstance() + Utils.serializeViaNestedStream(out, ser) { stream => + stream.writeObject(scala.reflect.classTag[Array[K]]) + stream.writeObject(rangeBounds) + } + } + } + + @throws(classOf[IOException]) + private def readObject(in: ObjectInputStream) { + val sfactory = SparkEnv.get.serializer + sfactory match { + case js: JavaSerializer => in.defaultReadObject() + case _ => + ordering = in.readObject().asInstanceOf[Ordering[K]] + binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int] + + val ser = sfactory.newInstance() + Utils.deserializeViaNestedStream(in, ser) { ds => + implicit val classTag = ds.readObject[ClassTag[Array[K]]]() + rangeBounds = ds.readObject[Array[K]]() + } + } + } +} + +//Todo: test this +object HBasePartitioner { + implicit def orderingRowKey[SparkImmutableBytesWritable]: Ordering[SparkImmutableBytesWritable] = + OrderingRowKey.asInstanceOf[Ordering[SparkImmutableBytesWritable]] +} + +object OrderingRowKey extends Ordering[SparkImmutableBytesWritable] { + def compare(a: SparkImmutableBytesWritable, b: SparkImmutableBytesWritable) = a.compareTo(b) +} \ No newline at end of file diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala index 065befbeb7971..a7e7a8ed413b8 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.types._ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer + private[hbase] case class HBaseRelation( tableName: String, hbaseNamespace: String, hbaseTableName: String, @@ -57,7 +58,6 @@ private[hbase] case class HBaseRelation( tableName: String, private def getConf: Configuration = if (configuration == null) HBaseConfiguration.create else configuration - lazy val attributes = nonKeyColumns.map(col => AttributeReference(col.sqlName, col.dataType, nullable = true)()) @@ -87,6 +87,20 @@ private[hbase] case class HBaseRelation( tableName: String, Option(partitions) } + + /** + * Return the start keys of all of the regions in this table, + * as a list of SparkImmutableBytesWritable. + */ + def getRegionStartKeys() = { + val byteKeys: Array[Array[Byte]] = htable.getStartKeys + val ret = ArrayBuffer[SparkImmutableBytesWritable]() + for (byteKey <- byteKeys) { + ret += new SparkImmutableBytesWritable(byteKey) + } + ret + } + def buildFilter(projList: Seq[NamedExpression], rowKeyPredicate: Option[Expression], valuePredicate: Option[Expression]) = { diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala index 23ae0970e20b3..4f04eb9854086 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseSQLParser.scala @@ -20,9 +20,15 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser} import org.apache.spark.sql.catalyst.SparkSQLParser -import org.apache.spark.sql.hbase.logical.{DropTablePlan, CreateHBaseTablePlan} +import org.apache.spark.sql.hbase.logical.{LoadDataIntoTable, CreateHBaseTablePlan, DropTablePlan} class HBaseSQLParser extends SqlParser { + + protected val DATA = Keyword("DATA") + protected val LOAD = Keyword("LOAD") + protected val LOCAL = Keyword("LOCAL") + protected val INPATH = Keyword("INPATH") + protected val BULK = Keyword("BULK") protected val CREATE = Keyword("CREATE") protected val DROP = Keyword("DROP") @@ -55,7 +61,7 @@ class HBaseSQLParser extends SqlParser { | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} ) - | insert | create | drop | alter + | insert | create | drop | alter | load ) override protected lazy val insert: Parser[LogicalPlan] = @@ -137,6 +143,18 @@ class HBaseSQLParser extends SqlParser { case tn ~ op ~ tc ~ cf => null } + protected lazy val load: Parser[LogicalPlan] = + ( + (LOAD ~> DATA ~> INPATH ~> stringLit) ~ + (opt(OVERWRITE) ~> INTO ~> TABLE ~> relation) ^^ { + case filePath ~ table => LoadDataIntoTable(filePath, table, false) + } + | (LOAD ~> DATA ~> LOCAL ~> INPATH ~> stringLit) ~ + (opt(OVERWRITE) ~> INTO ~> TABLE ~> relation) ^^ { + case filePath ~ table => LoadDataIntoTable(filePath, table, true) + } + ) + protected lazy val tableCol: Parser[(String, String)] = ident ~ (STRING | BYTE | SHORT | INTEGER | LONG | FLOAT | DOUBLE | BOOLEAN) ^^ { case e1 ~ e2 => (e1, e2) diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala index a564674861fce..af3d8ad95dc7c 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HBaseStrategies.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution._ import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hbase.execution._ +import org.apache.spark.sql.hbase.execution.{DropHbaseTableCommand, HBaseSQLTableScan, InsertIntoHBaseTable} /** * HBaseStrategies @@ -91,10 +91,12 @@ private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] { case logical.CreateHBaseTablePlan( tableName, nameSpace, hbaseTableName, colsSeq, keyCols, nonKeyCols) => - Seq(CreateHBaseTableCommand( + Seq(execution.CreateHBaseTableCommand( tableName, nameSpace, hbaseTableName, colsSeq, keyCols, nonKeyCols) (hbaseSQLContext)) + case logical.LoadDataIntoTable(path, table: HBaseRelation, isLocal) => + execution.BulkLoadIntoTable(path, table, isLocal)(hbaseSQLContext) :: Nil case InsertIntoTable(table: HBaseRelation, partition, child, _) => new InsertIntoHBaseTable(table, planLater(child))(hbaseSQLContext) :: Nil case logical.DropTablePlan(tableName) => diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala new file mode 100644 index 0000000000000..efaa301d98caf --- /dev/null +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/HadoopReader.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.apache.hadoop.mapred.JobConf +import org.apache.spark.rdd.HadoopRDD +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.{SparkContext, SerializableWritable} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.hbase.util.Bytes +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.KeyValue + +/** + * Helper class for scanning files stored in Hadoop - e.g., to read text file when bulk loading. + */ +private[hbase] +class HadoopReader( + @transient sc: SparkContext, + @transient jobConf: JobConf) { + + private val broadcastedHiveConf = + sc.broadcast(new SerializableWritable(jobConf)) + + private val minSplitsPerRDD = + sc.getConf.get("spark.sql.hbase.minPartitions", sc.defaultMinPartitions.toString).toInt + + private val splitRegex = sc.getConf.get("spark.sql.hbase.splitRegex", ",") + + val inputFormatClass = classOf[TextInputFormat] + + val rowKeyIds = "0,1,2" + + def makeBulkLoadRDD = { + val rdd = new HadoopRDD( + sc, + broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + None, + inputFormatClass, + classOf[LongWritable], + classOf[Text], + minSplitsPerRDD) + + // Todo: use mapPartitions more better, now just simply code + // Only take the value (skip the key) because Hbase works only with values. + rdd.map { value => + // Todo: needs info which field is rowkey and value from HbaseRelation here + val fields = value._2.toString.split(splitRegex) + val rowKey = Bytes.toBytes(fields(0)) + val rowKeyWritable = new SparkImmutableBytesWritable(rowKey) + val family = Bytes.toBytes("cf") + val qualifier = Bytes.toBytes("count") + val hbaseValue = Bytes.toBytes(fields(1)) +// val keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue) + // we should use Put?, which is better? keyvalue is for one column in column family, right? + val put = new Put(rowKey) + put.add(family, qualifier, hbaseValue) + (rowKeyWritable, put) + } + } + +} \ No newline at end of file diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala index 6473d6e0c5619..7d2ccfec18903 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/execution/HBaseOperators.scala @@ -16,12 +16,26 @@ */ package org.apache.spark.sql.hbase.execution +import java.util.Date +import java.text.SimpleDateFormat + +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.{CompatibilitySingletonFactory, HadoopShims} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{ShuffledRDD, RDD} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.{LeafNode, UnaryNode, SparkPlan} -import org.apache.spark.sql.hbase.{HBaseSQLReaderRDD, HBaseSQLContext, HBaseRelation} +import org.apache.spark.SparkContext._ +import org.apache.hadoop.mapred.{Reporter, JobConf} +import org.apache.hadoop.hbase.client.Put +import org.apache.spark.SerializableWritable +import org.apache.spark.sql.hbase._ +import org.apache.spark.sql.hbase.HBaseRelation /** * :: DeveloperApi :: @@ -67,3 +81,65 @@ case class InsertIntoHBaseTable( override def output = child.output } + +@DeveloperApi +case class BulkLoadIntoTable(path: String, relation: HBaseRelation, isLocal: Boolean)( + @transient hbContext: HBaseSQLContext) extends LeafNode { + + val jobConf = new JobConf(hbContext.sc.hadoopConfiguration) + + val hadoopReader = new HadoopReader(hbContext.sparkContext, jobConf) + + // atmp path for storing HFile + val tmpPath = "/bulkload/test" + + override def execute() = { + val ordering = HBasePartitioner.orderingRowKey + .asInstanceOf[Ordering[SparkImmutableBytesWritable]] + val splitKeys = relation.getRegionStartKeys() + val rdd = hadoopReader.makeBulkLoadRDD + val partitioner = new HBasePartitioner(rdd)(splitKeys.toArray) + val shuffled = + new ShuffledRDD[SparkImmutableBytesWritable, Put, Put](rdd, partitioner).setKeyOrdering(ordering) + + jobConf.setOutputKeyClass(classOf[SparkImmutableBytesWritable]) + jobConf.setOutputValueClass(classOf[Put]) + jobConf.set("mapred.output.format.class", classOf[HFileOutputFormat].getName) + jobConf.set("mapred.output.dir", tmpPath) + shuffled.saveAsHadoopDataset(jobConf) + + null + } + + def saveAsHFile( + rdd: RDD[(SparkImmutableBytesWritable, Put)], + jobConf: SerializableWritable[JobConf], + path: String) { + val conf = jobConf.value + val job = new Job(conf) + job.setOutputKeyClass(classOf[SparkImmutableBytesWritable]) + job.setOutputValueClass(classOf[Put]) + job.setOutputFormatClass(classOf[HFileOutputFormat]) + FileOutputFormat.setOutputPath(job, new Path(path)) + + val wrappedConf = new SerializableWritable(job.getConfiguration) + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = sqlContext.sparkContext.newRddId() + + + def getWriter( + outFormat: HFileOutputFormat, + conf: Configuration, + path: Path, + reporter: Reporter) = { + val hadoopShim = CompatibilitySingletonFactory.getInstance(classOf[HadoopShims]) + val context = hadoopShim.createTestTaskAttemptContext(job, "attempt_200707121733_0001_m_000000_0") + outFormat.getRecordWriter(context) + } + ??? + } + + override def output = Nil + +} diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala similarity index 72% rename from sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala rename to sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala index 7befe2d9d9d17..3bc6b9a72890b 100644 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/HBaseLogicalPlans.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/logical/hbaseOperators.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.hbase.logical -import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode, Command} case class CreateHBaseTablePlan(tableName: String, nameSpace: String, @@ -27,3 +27,18 @@ case class CreateHBaseTablePlan(tableName: String, ) extends Command case class DropTablePlan(tableName: String) extends Command + +/** + * Logical plan for Bulkload + * @param path input data file path + * @param child target relation + * @param isLocal using HDFS or local file + */ +case class LoadDataIntoTable(path: String, + child: LogicalPlan, + isLocal: Boolean) extends UnaryNode { + + override def output = Nil + + override def toString = s"LogicalPlan: LoadDataIntoTable(LOAD $path INTO $child)" +} diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala index 303723888dd5c..35bdd071dbd6e 100755 --- a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala +++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/package.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql +import org.apache.hadoop.hbase.io.ImmutableBytesWritable + package object hbase { type HBaseRawType = Array[Byte] } diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/BulkLoadIntoTableSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/BulkLoadIntoTableSuite.scala new file mode 100644 index 0000000000000..d2e3e43d43ea3 --- /dev/null +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/BulkLoadIntoTableSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.scalatest.FunSuite +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.{SparkContext, Logging, LocalSparkContext} +import org.apache.spark.rdd.ShuffledRDD +import org.apache.spark.SparkContext._ + +class BulkLoadIntoTableSuite extends FunSuite with LocalSparkContext with Logging{ + + test("write data to HFile") { + sc = new SparkContext("local", "test") + val jobConf = new JobConf(sc.hadoopConfiguration) + val hadoopReader = new HadoopReader(sc, jobConf) + // atmp path for storing HFile + val tmpPath = "/bulkload/test" + val ordering = HBasePartitioner.orderingRowKey + .asInstanceOf[Ordering[SparkImmutableBytesWritable]] + val splitKeys = (1 to 40).filter(_ % 5 == 0).filter(_ != 40).map { r => + new SparkImmutableBytesWritable(Bytes.toBytes(r)) + } + val rdd = hadoopReader.makeBulkLoadRDD + val partitioner = new HBasePartitioner(rdd)(splitKeys.toArray) + val shuffled = + new ShuffledRDD[SparkImmutableBytesWritable, Put, Put](rdd, partitioner).setKeyOrdering(ordering) + + jobConf.setOutputKeyClass(classOf[SparkImmutableBytesWritable]) + jobConf.setOutputValueClass(classOf[Put]) + jobConf.set("mapred.output.format.class", classOf[HFileOutputFormat].getName) + jobConf.set("mapred.output.dir", tmpPath) + shuffled.saveAsHadoopDataset(jobConf) + } + +} diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala new file mode 100644 index 0000000000000..19e0cf17190ae --- /dev/null +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBulkloadParserTest.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.hbase.logical.LoadDataIntoTable +import org.scalatest.{FunSuite, Matchers} + +/** + * Test Suite for HBase bulkload feature + */ +class HBaseBulkloadParserTest extends FunSuite with Matchers { + + // Test if we can parse 'LOAD DATA LOCAL INPATH './usr/file.csv' INTO TABLE tb' + test("bulkload parser test, local file") { + + val parser = new HBaseSQLParser() + val sql = raw"LOAD DATA LOCAL INPATH './usr/file.csv' INTO TABLE tb" + //val sql = "select" + + val plan: LogicalPlan = parser(sql) + assert(plan != null) + assert(plan.isInstanceOf[LoadDataIntoTable]) + + val l = plan.asInstanceOf[LoadDataIntoTable] + assert(l.path.equals(raw"./usr/file.csv")) + assert(l.isLocal) + + assert(plan.children(0).isInstanceOf[UnresolvedRelation]) + val r = plan.children(0).asInstanceOf[UnresolvedRelation] + assert(r.tableName.equals("tb")) + } + + // Test if we can parse 'LOAD DATA INPATH '/usr/hdfsfile.csv' INTO TABLE tb' + test("bulkload parser test, load hdfs file") { + + val parser = new HBaseSQLParser() + val sql = raw"LOAD DATA INPATH '/usr/hdfsfile.csv' INTO TABLE tb" + //val sql = "select" + + val plan: LogicalPlan = parser(sql) + assert(plan != null) + assert(plan.isInstanceOf[LoadDataIntoTable]) + + val l = plan.asInstanceOf[LoadDataIntoTable] + assert(l.path.equals(raw"/usr/hdfsfile.csv")) + assert(!l.isLocal) + assert(plan.children(0).isInstanceOf[UnresolvedRelation]) + val r = plan.children(0).asInstanceOf[UnresolvedRelation] + assert(r.tableName.equals("tb")) + } +} diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala new file mode 100644 index 0000000000000..19dba15920dff --- /dev/null +++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBasePartitionerSuite.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hbase + +import org.scalatest.FunSuite +import org.apache.spark.{SparkConf, LocalSparkContext, SparkContext, Logging} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.rdd.ShuffledRDD +class HBasePartitionerSuite extends FunSuite with LocalSparkContext with Logging { + + val conf = new SparkConf(loadDefaults = false) + + test("test hbase partitioner") { + sc = new SparkContext("local", "test") + val data = (1 to 40).map { r => + val rowKey = Bytes.toBytes(r) + val rowKeyWritable = new SparkImmutableBytesWritable(rowKey) + (rowKeyWritable, r) + } + val rdd = sc.parallelize(data, 4) + val splitKeys = (1 to 40).filter(_ % 5 == 0).filter(_ != 40).map { r => + new SparkImmutableBytesWritable(Bytes.toBytes(r)) + } + val partitioner = new HBasePartitioner(rdd)(splitKeys.toArray) + val ordering = HBasePartitioner.orderingRowKey + .asInstanceOf[Ordering[SparkImmutableBytesWritable]] + val shuffled = + new ShuffledRDD[SparkImmutableBytesWritable, Int, Int](rdd, partitioner).setKeyOrdering(ordering) + + val groups = shuffled.mapPartitionsWithIndex { (idx, iter) => + iter.map(x => (x._2, idx)) + }.collect() + assert(groups.size == 40) + assert(groups.map(_._2).toSet.size == 8) + groups.foreach { r => + assert(r._1 > 5 * r._2 && r._1 <= 5 * (1 + r._2)) + } + } +}