diff --git a/assembly/pom.xml b/assembly/pom.xml
index 31a01e4d8e1de..56ae693543345 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -204,6 +204,16 @@
+
+ hbase
+
+
+ org.apache.spark
+ spark-hbase_${scala.binary.version}
+ ${project.version}
+
+
+
spark-ganglia-lgpl
diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd
index 3cd0579aea8d3..9518b886a29cd 100644
--- a/bin/compute-classpath.cmd
+++ b/bin/compute-classpath.cmd
@@ -81,6 +81,7 @@ set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%tools\target\scala-%SCALA_VERSION%\clas
set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\classes
set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\classes
set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\classes
+set SPARK_CLASSES=%SPARK_CLASSES%;%FWDIR%sql\hbase\target\scala-%SCALA_VERSION%\classes
set SPARK_TEST_CLASSES=%FWDIR%core\target\scala-%SCALA_VERSION%\test-classes
set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%repl\target\scala-%SCALA_VERSION%\test-classes
@@ -91,6 +92,7 @@ set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%streaming\target\scala-%SCALA
set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\catalyst\target\scala-%SCALA_VERSION%\test-classes
set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\core\target\scala-%SCALA_VERSION%\test-classes
set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\hive\target\scala-%SCALA_VERSION%\test-classes
+set SPARK_TEST_CLASSES=%SPARK_TEST_CLASSES%;%FWDIR%sql\hbase\target\scala-%SCALA_VERSION%\test-classes
if "x%SPARK_TESTING%"=="x1" (
rem Add test clases to path - note, add SPARK_CLASSES and SPARK_TEST_CLASSES before CLASSPATH
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 905bbaf99b374..2ac345ba5cd04 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -58,6 +58,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/tools/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/sql/hbase/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
@@ -131,6 +132,7 @@ if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/streaming/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/test-classes"
+ CLASSPATH="$CLASSPATH:$FWDIR/sql/hbase/target/scala-$SCALA_VERSION/test-classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/test-classes"
fi
diff --git a/core/pom.xml b/core/pom.xml
index a5a178079bc57..71b377786783e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -391,6 +391,24 @@
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+ test-jar-on-test-compile
+ test-compile
+
+ test-jar
+
+
+
+
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 554a33ce7f1a6..a02859aa38e69 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -39,7 +39,13 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
* the stream 'resets' object class descriptions have to be re-written)
*/
def writeObject[T: ClassTag](t: T): SerializationStream = {
- objOut.writeObject(t)
+ try {
+ objOut.writeObject(t)
+ } catch {
+ case e : Exception =>
+ System.err.println(s"serializable err on $t of type ${t.getClass.getName}")
+ e.printStackTrace
+ }
counter += 1
if (counterReset > 0 && counter >= counterReset) {
objOut.reset()
diff --git a/examples/pom.xml b/examples/pom.xml
index eb49a0e5af22d..be6544e515ab5 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -122,7 +122,88 @@
org.apache.hbase
- hbase
+ hbase-common
+ ${hbase.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ io.netty
+ netty
+
+
+ commons-logging
+ commons-logging
+
+
+ org.jruby
+ jruby-complete
+
+
+
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ io.netty
+ netty
+
+
+ commons-logging
+ commons-logging
+
+
+ org.jruby
+ jruby-complete
+
+
+
+
+ org.apache.hbase
+ hbase-server
+ ${hbase.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ io.netty
+ netty
+
+
+ commons-logging
+ commons-logging
+
+
+ org.jruby
+ jruby-complete
+
+
+
+
+ org.apache.hbase
+ hbase-protocol
${hbase.version}
@@ -219,7 +300,8 @@
org.apache.maven.plugins
maven-install-plugin
- true
+ false
+
diff --git a/pom.xml b/pom.xml
index 288bbf1114bea..cf975f2d723bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,6 +94,7 @@
streaming
sql/catalyst
sql/core
+ sql/hbase
sql/hive
repl
assembly
@@ -124,8 +125,8 @@
1.0.4
2.4.1
${hadoop.version}
- 0.94.6
1.4.0
+ 0.98.5-hadoop2
3.4.5
0.12.0-protobuf-2.5
1.4.3
@@ -855,13 +856,13 @@
testCompile
-
- attach-scaladocs
- verify
-
- doc-jar
-
-
+
+
+
+
+
+
+
${scala.version}
@@ -902,6 +903,7 @@
${java.version}
${java.version}
+ true
UTF-8
1024m
true
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index b31a82f9b19ac..b9e80769aa965 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -1417,6 +1417,52 @@ def _get_hive_ctx(self):
return self._jvm.TestHiveContext(self._jsc.sc())
+
+class HBaseContext(SQLContext):
+
+ """A variant of Spark SQL that integrates with data stored in Hive.
+
+ Configuration for Hive is read from hive-site.xml on the classpath.
+ It supports running both SQL and HiveQL commands.
+ """
+
+ def __init__(self, sparkContext, hbaseContext=None):
+ """Create a new HiveContext.
+
+ @param sparkContext: The SparkContext to wrap.
+ @param hiveContext: An optional JVM Scala HiveContext. If set, we do not instatiate a new
+ HiveContext in the JVM, instead we make all calls to this object.
+ """
+ SQLContext.__init__(self, sparkContext)
+
+ if hbaseContext:
+ self._scala_hbaseContext = hbaseContext
+
+ @property
+ def _ssql_ctx(self):
+ try:
+ if not hasattr(self, '_scala_HbaseContext'):
+ self._scala_HBaseContext = self._get_hbase_ctx()
+ return self._scala_HBaseContext
+ except Py4JError as e:
+ raise Exception("You must build Spark with Hbase. "
+ "Export 'SPARK_HBASE=true' and run "
+ "sbt/sbt assembly", e)
+
+ def _get_hbase_ctx(self):
+ return self._jvm.HBaseContext(self._jsc.sc())
+
+
+ def sql(self, hqlQuery):
+ """
+ DEPRECATED: Use sql()
+ """
+ warnings.warn("hiveql() is deprecated as the sql function now parses using HiveQL by" +
+ "default. The SQL dialect for parsing can be set using 'spark.sql.dialect'",
+ DeprecationWarning)
+ return HBaseSchemaRDD(self._ssql_ctx.sql(hqlQuery).toJavaSchemaRDD(), self)
+
+
def _create_row(fields, values):
row = Row(*values)
row.__FIELDS__ = fields
@@ -1801,6 +1847,14 @@ def _test():
if failure_count:
exit(-1)
+class HBaseSchemaRDD(SchemaRDD):
+ def createTable(self, tableName, overwrite=False):
+ """Inserts the contents of this SchemaRDD into the specified table.
+
+ Optionally overwriting any existing data.
+ """
+ self._jschema_rdd.createTable(tableName, overwrite)
+
if __name__ == "__main__":
_test()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index b4d606d37e732..d594c64b2a512 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst
+import java.lang.reflect.Method
+
import scala.language.implicitConversions
import org.apache.spark.sql.catalyst.analysis._
@@ -110,9 +112,9 @@ class SqlParser extends AbstractSparkSQLParser {
.getClass
.getMethods
.filter(_.getReturnType == classOf[Keyword])
- .map(_.invoke(this).asInstanceOf[Keyword].str)
-
+ .map{_.invoke(this).asInstanceOf[Keyword].str}
override val lexical = new SqlLexical(reservedWords)
+ println(reservedWords)
protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index bd110218d34f7..3086a4d6264b5 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -92,6 +92,24 @@
org.scalatest
scalatest-maven-plugin
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+ test-jar-on-test-compile
+ test-compile
+
+ test-jar
+
+
+
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 25ba7d88ba538..6b585e2fa314d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.LogicalRDD
diff --git a/sql/hbase/pom.xml b/sql/hbase/pom.xml
new file mode 100644
index 0000000000000..5f0812a69448b
--- /dev/null
+++ b/sql/hbase/pom.xml
@@ -0,0 +1,267 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent
+ 1.2.0-SNAPSHOT
+ ../../pom.xml
+
+
+ org.apache.spark
+ spark-hbase_2.10
+ jar
+ Spark Project HBase
+ http://spark.apache.org/
+
+ hbase
+ 0.98.5-hadoop2
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.spark
+ spark-catalyst_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.hbase
+ hbase-common
+ ${hbase.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ io.netty
+ netty
+
+
+ commons-logging
+ commons-logging
+
+
+ org.jruby
+ jruby-complete
+
+
+
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ io.netty
+ netty
+
+
+ commons-logging
+ commons-logging
+
+
+ org.jruby
+ jruby-complete
+
+
+
+
+ org.apache.hbase
+ hbase-server
+ ${hbase.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ io.netty
+ netty
+
+
+ commons-logging
+ commons-logging
+
+
+ org.jruby
+ jruby-complete
+
+
+
+
+ org.apache.hbase
+ hbase-protocol
+ ${hbase.version}
+
+
+ asm
+ asm
+
+
+ org.jboss.netty
+ netty
+
+
+ io.netty
+ netty
+
+
+ commons-logging
+ commons-logging
+
+
+ org.jruby
+ jruby-complete
+
+
+
+
+ org.apache.hbase
+ hbase-testing-util
+ ${hbase.version}
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.apache.avro
+ avro
+
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ test
+
+
+ org.scalacheck
+ scalacheck_${scala.binary.version}
+ test
+
+
+
+
+
+ hbase
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-scala-test-sources
+ generate-test-sources
+
+ add-test-source
+
+
+
+ src/test/scala
+ compatibility/src/test/scala
+
+
+
+
+
+
+
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+ org.scalatest
+ scalatest-maven-plugin
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 2.4
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+
+ ${basedir}/../../lib_managed/jars
+ false
+ false
+ true
+ org.datanucleus
+
+
+
+
+
+
+
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/DataTypeUtils.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/DataTypeUtils.scala
new file mode 100644
index 0000000000000..79ad498e54d3f
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/DataTypeUtils.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream, ByteArrayInputStream}
+import java.math.BigDecimal
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * DataTypeUtils
+ * Created by sboesch on 10/9/14.
+ */
+object DataTypeUtils {
+
+ def cmp(str1: Option[HBaseRawType], str2: Option[HBaseRawType]) = {
+ if (str1.isEmpty && str2.isEmpty) 0
+ else if (str1.isEmpty) -2
+ else if (str2.isEmpty) 2
+ else {
+ val ix = 0
+ val s1arr = str1.get
+ val s2arr = str2.get
+ var retval: Option[Int] = None
+ while (ix >= str1.size && ix >= str2.size && retval.isEmpty) {
+ if (s1arr(ix) != s2arr(ix)) {
+ retval = Some(Math.signum(s1arr(ix) - s2arr(ix)).toInt)
+ }
+ }
+ retval.getOrElse(
+ if (s1arr.length == s2arr.length) {
+ 0
+ } else {
+ Math.signum(s1arr.length - s2arr.length).toInt
+ }
+ )
+ }
+ }
+
+ def compare(col1: HBaseRawType, dataType1: DataType,
+ col2: HBaseRawType, dataType2: DataType): Int = {
+ if (dataType1 != dataType2) {
+ throw new UnsupportedOperationException("Preseantly datatype casting is not supported")
+ } else dataType1 match {
+ case BinaryType => compare(col1, col2)
+ case StringType => compare(cast(col1, StringType), cast(col2, StringType))
+ case IntegerType => compare(cast(col1, IntegerType), cast(col2, IntegerType))
+ case LongType => compare(cast(col1, LongType), cast(col2, LongType))
+ case FloatType => compare(cast(col1, FloatType), cast(col2, FloatType))
+ case DoubleType => compare(cast(col1, DoubleType), cast(col2, DoubleType))
+ case _ => throw new UnsupportedOperationException(
+ s"DataTypeUtils.compare(with dataType): type $dataType1 not supported")
+ }
+ }
+
+ def cast(bytes: HBaseRawType, dataType: DataType): Any = {
+ val out = {
+ if (dataType == StringType) {
+ new String(bytes, HBaseByteEncoding)
+ } else if (dataType == BinaryType) {
+ bytes(0)
+ } else if (dataType == ByteType) {
+ bytes(0)
+ } else {
+ val bis = new ByteArrayInputStream(bytes)
+ val dis = new DataInputStream(bis)
+ val outval = dataType match {
+ case ShortType => dis.readShort
+ case IntegerType => dis.readInt
+ case LongType => dis.readLong
+ case FloatType => dis.readFloat
+ case DoubleType => dis.readDouble
+ case _ => throw new UnsupportedOperationException(s"Unsupported type ${dataType}")
+ }
+ dis.close
+ outval
+ }
+ }
+ out
+ }
+
+ private def calcSizeOfPrimitive(a: Any): Int = {
+ val bos = new ByteArrayOutputStream(32)
+ val dos = new DataOutputStream(bos)
+ a match {
+ case b: Boolean =>
+ dos.writeBoolean(a.asInstanceOf[Boolean])
+ dos.size
+ case i: Integer =>
+ dos.writeInt(a.asInstanceOf[Integer])
+ dos.size
+ case _ => {
+ throw new UnsupportedOperationException
+ ("What type are you interested in {$a.getClas.getName} for its length?")
+ -1 // why does compiler want this after an exception ??
+ }
+ }
+ }
+
+ private val SizeOfBoolean = calcSizeOfPrimitive(true)
+ private val SizeOfInteger = calcSizeOfPrimitive(new Integer(1))
+
+ def toBytes(inval: Any): Array[Byte] = {
+ val out = inval match {
+ case barr: Array[Byte] =>
+ barr
+ case s: String =>
+ inval.asInstanceOf[String].getBytes(HBaseByteEncoding)
+ case b: Byte =>
+ Array(b)
+ case b: Boolean =>
+ val bos = new ByteArrayOutputStream(SizeOfBoolean)
+ val dos = new DataOutputStream(bos)
+ dos.writeBoolean(b)
+ bos.toByteArray
+ case s: Short =>
+ val bos = new ByteArrayOutputStream(2)
+ val dos = new DataOutputStream(bos)
+ dos.writeShort(s)
+ bos.toByteArray
+ case i: Integer =>
+ val bos = new ByteArrayOutputStream(SizeOfInteger)
+ val dos = new DataOutputStream(bos)
+ dos.writeInt(i)
+ bos.toByteArray
+ case l: Long =>
+ val bos = new ByteArrayOutputStream(8)
+ val dos = new DataOutputStream(bos)
+ dos.writeLong(l)
+ bos.toByteArray
+ case f: Float =>
+ val bos = new ByteArrayOutputStream(4)
+ val dos = new DataOutputStream(bos)
+ dos.writeFloat(f)
+ bos.toByteArray
+ case d: Double =>
+ val bos = new ByteArrayOutputStream(8)
+ val dos = new DataOutputStream(bos)
+ dos.writeDouble(d)
+ bos.toByteArray
+ case _ =>
+ throw
+ new UnsupportedOperationException(s"Unknown datatype in toBytes: ${inval.toString}")
+ }
+ out
+ }
+
+ def hbaseFieldToRowField(bytes: HBaseRawType, dataType: DataType): Any = cast(bytes, dataType)
+
+ def toDataType(clazz: Class[_]): sql.DataType = clazz match {
+ case c if c == classOf[String] => StringType
+ case c if c == classOf[Array[_]] => BinaryType
+ case c if c == classOf[Byte] => ByteType
+ case c if c == classOf[Short] => ShortType
+ case c if c == classOf[Integer] => IntegerType
+ case c if c == classOf[Long] => LongType
+ case c if c == classOf[Float] => FloatType
+ case c if c == classOf[Double] => DoubleType
+ case _ => throw new UnsupportedOperationException(
+ s"toDataType: class ${clazz.getName} not supported")
+ }
+
+ import reflect.runtime.universe._
+
+ def compare[T: TypeTag](col1: T, col2: T): Int = weakTypeOf[T] match {
+ case dt if dt == weakTypeOf[Array[_]] =>
+ compareRaw(col1.asInstanceOf[HBaseRawType], col2.asInstanceOf[HBaseRawType])
+ case dt if dt == weakTypeOf[String] =>
+ col1.asInstanceOf[String].compareTo(col2.asInstanceOf[String])
+ case dt if dt == weakTypeOf[Integer] =>
+ col1.asInstanceOf[Integer] - col2.asInstanceOf[Integer]
+ case dt if dt == weakTypeOf[Long] =>
+ (col1.asInstanceOf[Long] - col2.asInstanceOf[Long]).toInt
+ case dt if dt == weakTypeOf[Float] =>
+ (col1.asInstanceOf[Float] - col2.asInstanceOf[Float]).toInt
+ case dt if dt == weakTypeOf[Double] =>
+ (col1.asInstanceOf[Double] - col2.asInstanceOf[Double]).toInt
+ case _ => throw new UnsupportedOperationException(
+ s"DataTypeUtils.compare: type ${weakTypeOf[T]} not supported")
+ }
+
+ def compareRaw(col1: HBaseRawType, col2: HBaseRawType) = {
+ if (col1 == null || col2 == null) {
+ throw new IllegalArgumentException("RelationalOperator: Can not compare nulls")
+ } else {
+ val c1len = col1.length
+ val c2len = col2.length
+ if (c1len == 0 && c2len == 0) {
+ 0
+ } else {
+ var ptr = 0
+ var retVal: Option[Int] = None
+ while (ptr < c1len && ptr < c2len) {
+ if (col1(ptr) < col2(ptr)) {
+ retVal = Some(-1)
+ } else if (col1(ptr) > col2(ptr)) {
+ retVal = Some(1)
+ } else {
+ ptr += 1
+ }
+ }
+ retVal.getOrElse(c1len - c2len)
+ }
+ }
+ }
+
+ import reflect.runtime.universe._
+
+ def sizeOf[T: TypeTag](t: T) = weakTypeOf[T] match {
+ case dt if dt == weakTypeOf[Byte] => 1
+ case dt if dt == weakTypeOf[Short] => 2
+ case dt if dt == weakTypeOf[Int] => Integer.SIZE
+ case dt if dt == weakTypeOf[Long] => 8
+ case dt if dt == weakTypeOf[Float] => 4
+ case dt if dt == weakTypeOf[Double] => 8
+ case dt if dt == weakTypeOf[String] => t.asInstanceOf[String].length
+ }
+
+ def schemaIndex(schema: StructType, sqlName: String) = {
+ schema.fieldNames.zipWithIndex.find { case (name: String, ix: Int) => name == sqlName}
+ .getOrElse((null, -1))._2
+ }
+
+ def catalystRowToHBaseRawVals(schema: StructType, row: Row, cols: HBaseCatalog.Columns):
+ HBaseRawRowSeq = {
+ val rawCols = cols.columns.zipWithIndex.map { case (col, ix) =>
+ val rx = schemaIndex(schema, col.sqlName)
+ val rType = schema(col.sqlName).dataType
+ // if (!kc.dataType == rx) {}
+ col.dataType match {
+ case StringType =>
+ row.getString(rx)
+ case ByteType =>
+ row.getByte(rx)
+ case ShortType =>
+ Array(row.getShort(rx).toByte)
+ case IntegerType =>
+ row.getInt(rx)
+ case LongType =>
+ row.getLong(rx)
+ case FloatType =>
+ row.getFloat(rx)
+ case DoubleType =>
+ row.getDouble(rx)
+ case BooleanType =>
+ row.getBoolean(rx)
+ case _ =>
+ throw
+ new UnsupportedOperationException(s"Need to flesh out all dataytypes: ${col.dataType}")
+ }
+ }
+ rawCols.map(toBytes(_))
+ }
+
+ def convertToBytes(dataType: DataType, data: Any): Array[Byte] = {
+ dataType match {
+ case StringType => Bytes.toBytes(data.asInstanceOf[String])
+ case FloatType => Bytes.toBytes(data.asInstanceOf[Float])
+ case IntegerType => Bytes.toBytes(data.asInstanceOf[Int])
+ case ByteType => Array(data.asInstanceOf[Byte])
+ case ShortType => Bytes.toBytes(data.asInstanceOf[Short])
+ case DoubleType => Bytes.toBytes(data.asInstanceOf[Double])
+ case LongType => Bytes.toBytes(data.asInstanceOf[Long])
+ case BinaryType => Bytes.toBytesBinary(data.asInstanceOf[String])
+ case BooleanType => Bytes.toBytes(data.asInstanceOf[Boolean])
+ case DecimalType => Bytes.toBytes(data.asInstanceOf[BigDecimal])
+ case TimestampType => throw new Exception("not supported")
+ case _ => throw new Exception("not supported")
+ }
+ }
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseAnalyzer.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseAnalyzer.scala
new file mode 100644
index 0000000000000..e36f30f1856c0
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseAnalyzer.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.sql.catalyst.analysis._
+
+class HBaseAnalyzer(catalog: Catalog,
+ registry: FunctionRegistry,
+ caseSensitive: Boolean)
+ extends Analyzer(catalog, registry, caseSensitive) {
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseCatalog.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseCatalog.scala
new file mode 100644
index 0000000000000..9d6485daf386e
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseCatalog.scala
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
+import org.apache.log4j.Logger
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.analysis.SimpleCatalog
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * HBaseCatalog
+ */
+private[hbase] class HBaseCatalog(@transient hbaseContext: HBaseSQLContext,
+ @transient configuration: Configuration)
+ extends SimpleCatalog(false) with Logging with Serializable {
+
+ import org.apache.spark.sql.hbase.HBaseCatalog._
+
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ override def registerTable(databaseName: Option[String], tableName: String,
+ plan: LogicalPlan): Unit = ???
+
+ // TODO(Bo): read the entire HBASE_META_TABLE and process it once, then cache it
+ // in this class
+ override def unregisterAllTables(): Unit = {
+ tables.clear
+ }
+
+ override def unregisterTable(databaseName: Option[String], tableName: String): Unit =
+ tables -= tableName
+
+ override def lookupRelation(nameSpace: Option[String], sqlTableName: String,
+ alias: Option[String]): LogicalPlan = {
+ val itableName = processTableName(sqlTableName)
+ val catalogTable = getTable(sqlTableName)
+ if (catalogTable.isEmpty) {
+ throw new IllegalArgumentException(
+ s"Table $nameSpace.$sqlTableName does not exist in the catalog")
+ }
+ val tableName = TableName.valueOf(nameSpace.orNull, itableName)
+ new HBaseRelation(configuration, hbaseContext, catalogTable.get)
+ }
+
+ protected def processTableName(tableName: String): String = {
+ if (!caseSensitive) {
+ tableName.toLowerCase
+ } else {
+ tableName
+ }
+ }
+
+ def getDataType(dataType: String): DataType = {
+ if (dataType.equalsIgnoreCase(StringType.simpleString)) {
+ StringType
+ } else if (dataType.equalsIgnoreCase(ByteType.simpleString)) {
+ ByteType
+ } else if (dataType.equalsIgnoreCase(ShortType.simpleString)) {
+ ShortType
+ } else if (dataType.equalsIgnoreCase(IntegerType.simpleString)) {
+ IntegerType
+ } else if (dataType.equalsIgnoreCase(LongType.simpleString)) {
+ LongType
+ } else if (dataType.equalsIgnoreCase(FloatType.simpleString)) {
+ FloatType
+ } else if (dataType.equalsIgnoreCase(DoubleType.simpleString)) {
+ DoubleType
+ } else if (dataType.equalsIgnoreCase(BooleanType.simpleString)) {
+ BooleanType
+ } else {
+ throw new IllegalArgumentException(s"Unrecognized data type '${dataType}'")
+ }
+ }
+
+ def getTable(tableName: String): Option[HBaseCatalogTable] = {
+ val table = new HTable(configuration, MetaData)
+
+ val get = new Get(Bytes.toBytes(tableName))
+ val rest1 = table.get(get)
+ if (rest1 == null) {
+ None
+ } else {
+ var columnList = List[Column]()
+ import scala.collection.mutable.{Seq => MutSeq}
+ var columnFamilies = MutSeq[(String)]()
+
+ var nonKeyColumns = Bytes.toString(rest1.getValue(ColumnFamily, QualNonKeyColumns))
+ if (nonKeyColumns != null) {
+ if (nonKeyColumns.length > 0) {
+ nonKeyColumns = nonKeyColumns.substring(0, nonKeyColumns.length - 1)
+ }
+
+ val nonKeyColumnArray = nonKeyColumns.split(";")
+ for (nonKeyColumn <- nonKeyColumnArray) {
+ val nonKeyColumnInfo = nonKeyColumn.split(",")
+ val sqlName = nonKeyColumnInfo(0)
+ val family = nonKeyColumnInfo(1)
+ val qualifier = nonKeyColumnInfo(2)
+ val dataType = getDataType(nonKeyColumnInfo(3))
+
+ val column = Column(sqlName, family, qualifier, dataType)
+ columnList = columnList :+ column
+ if (!(columnFamilies contains family)) {
+ columnFamilies = columnFamilies :+ family
+ }
+ }
+ }
+
+ // What if this were not an HBase table? We get NPE's here..
+ val hbaseName = Bytes.toString(rest1.getValue(ColumnFamily, QualHbaseName))
+ val hbaseNameArray = hbaseName.split(",")
+ val hbaseNamespace = hbaseNameArray(0)
+ val hbaseTableName = hbaseNameArray(1)
+
+ var keyColumns = Bytes.toString(rest1.getValue(ColumnFamily, QualKeyColumns))
+ if (keyColumns.length > 0) {
+ keyColumns = keyColumns.substring(0, keyColumns.length - 1)
+ }
+ val keyColumnArray = keyColumns.split(";")
+ var keysList = List[Column]()
+ for (keyColumn <- keyColumnArray) {
+ val index = keyColumn.indexOf(",")
+ val sqlName = keyColumn.substring(0, index)
+ val dataType = getDataType(keyColumn.substring(index + 1))
+ val qualName = sqlName
+ val col = Column(sqlName, null, qualName, dataType)
+ keysList = keysList :+ col
+ }
+ val rowKey = new Columns(keysList)
+
+ val fullHBaseName =
+ if (hbaseNamespace.length == 0) {
+ TableName.valueOf(hbaseTableName)
+ }
+ else {
+ TableName.valueOf(hbaseNamespace, hbaseTableName)
+ }
+
+ Some(HBaseCatalogTable(tableName,
+ SerializableTableName(fullHBaseName),
+ rowKey,
+ Seq(columnFamilies: _*),
+ new Columns(columnList)))
+ }
+ }
+
+ def createMetadataTable(admin: HBaseAdmin) = {
+ val desc = new HTableDescriptor(TableName.valueOf(MetaData))
+ val coldef = new HColumnDescriptor(ColumnFamily)
+ desc.addFamily(coldef)
+ admin.createTable(desc)
+ }
+
+ def checkHBaseTableExists(hbaseTableName: String): Boolean = {
+ val admin = new HBaseAdmin(configuration)
+ admin.tableExists(hbaseTableName)
+ }
+
+ def checkLogicalTableExist(tableName: String): Boolean = {
+ val admin = new HBaseAdmin(configuration)
+ if (!checkHBaseTableExists(MetaData)) {
+ // create table
+ createMetadataTable(admin)
+ }
+
+ val table = new HTable(configuration, MetaData)
+ val get = new Get(Bytes.toBytes(tableName))
+ val result = table.get(get)
+
+ result.size() > 0
+ }
+
+ def checkFamilyExists(hbaseTableName: String, family: String): Boolean = {
+ val admin = new HBaseAdmin(configuration)
+ val tableDescriptor = admin.getTableDescriptor(TableName.valueOf(hbaseTableName))
+ tableDescriptor.hasFamily(Bytes.toBytes(family))
+ }
+
+ def deleteTable(tableName: String): Unit = {
+ if (!checkLogicalTableExist(tableName)) {
+ throw new Exception("The logical table:" +
+ tableName + " doesn't exist")
+ }
+
+ val admin = new HBaseAdmin(configuration)
+ val table = new HTable(configuration, MetaData)
+
+ val delete = new Delete(Bytes.toBytes(tableName))
+ table.delete(delete)
+
+ table.close()
+ }
+
+ def createTable(hbaseNamespace: String,
+ tableName: String,
+ hbaseTableName: String,
+ keyColumns: Seq[KeyColumn],
+ nonKeyColumns: Columns
+ ): Unit = {
+ if (checkLogicalTableExist(tableName)) {
+ throw new Exception("The logical table:" +
+ tableName + " has already existed")
+ }
+
+ if (!checkHBaseTableExists(hbaseTableName)) {
+ throw new Exception("The HBase table " +
+ hbaseTableName + " doesn't exist")
+ }
+
+ nonKeyColumns.columns.foreach {
+ case Column(_, family, _, _, _) =>
+ if (!checkFamilyExists(hbaseTableName, family)) {
+ throw new Exception(
+ "The HBase table doesn't contain the Column Family: " +
+ family)
+ }
+ }
+
+ val admin = new HBaseAdmin(configuration)
+ val avail = admin.isTableAvailable(MetaData)
+
+ if (!avail) {
+ // create table
+ createMetadataTable(admin)
+ }
+
+ val table = new HTable(configuration, MetaData)
+ table.setAutoFlushTo(false)
+ val rowKey = tableName
+
+ val get = new Get(Bytes.toBytes(rowKey))
+ if (table.exists(get)) {
+ throw new Exception("row key exists")
+ }
+ else {
+ val put = new Put(Bytes.toBytes(rowKey))
+
+ val result1 = new StringBuilder
+ for (column <- nonKeyColumns.columns) {
+ val sqlName = column.sqlName
+ val family = column.family
+ val qualifier = column.qualifier
+ val dataType = column.dataType
+ result1.append(sqlName)
+ result1.append(",")
+ result1.append(family)
+ result1.append(",")
+ result1.append(qualifier)
+ result1.append(",")
+ result1.append(dataType.simpleString)
+ result1.append(";")
+ }
+ put.add(ColumnFamily, QualNonKeyColumns, Bytes.toBytes(result1.toString))
+
+ val result2 = new StringBuilder
+ result2.append(hbaseNamespace)
+ result2.append(",")
+ result2.append(hbaseTableName)
+ put.add(ColumnFamily, QualHbaseName, Bytes.toBytes(result2.toString))
+
+ val result3 = new StringBuilder
+ for (column <- keyColumns) {
+ val sqlName = column.sqlName
+ val dataType = column.dataType
+ result3.append(sqlName)
+ result3.append(",")
+ result3.append(dataType.simpleString)
+ result3.append(";")
+ }
+ put.add(ColumnFamily, QualKeyColumns, Bytes.toBytes(result3.toString))
+
+ table.put(put)
+
+ table.flushCommits()
+ }
+ }
+
+
+}
+
+object HBaseCatalog {
+
+ import org.apache.spark.sql.catalyst.types._
+
+ val MetaData = "metadata"
+ val ColumnFamily = Bytes.toBytes("colfam")
+ val QualKeyColumns = Bytes.toBytes("keyColumns")
+ val QualNonKeyColumns = Bytes.toBytes("nonKeyColumns")
+ val QualHbaseName = Bytes.toBytes("hbaseName")
+
+ case class Column(sqlName: String, family: String, qualifier: String,
+ dataType: DataType,
+ ordinal: Int = -1) extends Ordered[Column] {
+ def fullName = s"$family:$qualifier"
+
+ def toColumnName = ColumnName(Some(family), qualifier)
+
+ override def hashCode(): Int = {
+ sqlName.hashCode * 31 + (if (family != null) family.hashCode * 37 else 0) +
+ qualifier.hashCode * 41 + dataType.hashCode * 43 + ordinal.hashCode * 47
+ }
+
+ override def equals(obj: scala.Any): Boolean = {
+ val superEquals = super.equals(obj)
+ val retval = hashCode == obj.hashCode
+ retval // note: superEquals is false whereas retval is true. Interesting..
+ }
+
+ override def compare(that: Column): Int = {
+ -(ordinal - that.ordinal)
+ }
+ }
+
+ object Column extends Serializable {
+ def toAttributeReference(col: Column): AttributeReference = {
+ AttributeReference(col.sqlName, col.dataType,
+ nullable = true)()
+ }
+ }
+
+ class Columns(inColumns: Seq[Column]) extends Serializable {
+ private val colx = new java.util.concurrent.atomic.AtomicInteger
+
+ val columns = inColumns.map {
+ case Column(s, f, q, d, -1) => Column(s, f, q, d, nextOrdinal)
+ case col => col
+ }
+
+ def nextOrdinal() = colx.getAndIncrement
+
+ def apply(colName: ColumnName) = {
+ map(colName)
+ }
+
+ def apply(colName: String): Option[Column] = {
+ val Pat = "(.*):(.*)".r
+ colName match {
+ case Pat(colfam, colqual) => toOpt(map(ColumnName(Some(colfam), colqual)))
+ case sqlName: String => findBySqlName(sqlName)
+ }
+ }
+
+ def toOpt[A: reflect.ClassTag](a: A): Option[A] = a match {
+ case a: Some[A] => a
+ case None => None
+ case a: A => Some(a)
+ }
+
+ def findBySqlName(sqlName: String): Option[Column] = {
+ map.iterator.find { case (cname, col) =>
+ col.sqlName == sqlName
+ }.map(_._2)
+ }
+
+ def toColumnNames() = {
+ columns.map(_.toColumnName)
+ }
+
+ import scala.collection.mutable
+
+ private val map: mutable.Map[ColumnName, Column] =
+ columns.foldLeft(mutable.Map[ColumnName, Column]()) { case (m, c) =>
+ m(ColumnName(if (c.family != null) Some(c.family) else None,
+ c.qualifier)) = c
+ m
+ }
+
+ def getColumn(colName: String): Option[Column] = map.get(ColumnName(colName))
+
+ def families() = Set(columns.map(_.family))
+
+ def asAttributes() = {
+ columns.map { col =>
+ Column.toAttributeReference(col)
+ }
+ }
+
+ override def equals(that: Any) = {
+ // that.isInstanceOf[Columns] && that.hashCode == hashCode
+ if (!that.isInstanceOf[Columns]) {
+ false
+ } else {
+ val other = that.asInstanceOf[Columns]
+ val result = other.columns.size == columns.size && columns.zip(other.columns)
+ .forall { case (col, ocol) =>
+ col.equals(ocol)
+ }
+ result
+ }
+ }
+
+ override def hashCode() = {
+ val hash = columns.foldLeft(47 /* arbitrary start val .. */) {
+ _ + _.hashCode
+ }
+ hash
+ }
+
+ }
+
+ case class HBaseCatalogTable(tablename: String,
+ hbaseTableName: SerializableTableName,
+ rowKey: Columns, // Should do RowKey for geneeralization
+ colFamilies: Seq[String],
+ columns: Columns) {
+
+ val rowKeyColumns = rowKey
+
+ lazy val allColumns = new Columns(rowKeyColumns.columns ++ columns.columns)
+
+ }
+
+ case class KeyColumn(sqlName: String, dataType: DataType)
+
+ // Following supports Pluggable RowKey.
+ trait RowKey
+
+ case class TypedRowKey(columns: Columns) extends RowKey
+
+ case object RawBytesRowKey extends RowKey
+
+ // Convenience method to aid in validation/testing
+ private[hbase] def getKeysFromAllMetaTableRows(configuration: Configuration)
+ : Seq[HBaseRawType] = {
+ val htable = new HTable(configuration, MetaData)
+ val scan = new Scan
+ scan.setFilter(new FirstKeyOnlyFilter())
+ val scanner = htable.getScanner(scan)
+ import scala.collection.JavaConverters._
+ import scala.collection.mutable
+ val rkeys = mutable.ArrayBuffer[HBaseRawType]()
+ val siter = scanner.iterator.asScala
+ while (siter.hasNext) {
+ rkeys += siter.next.getRow
+ }
+ rkeys
+ }
+
+}
+
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBasePartition.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBasePartition.scala
new file mode 100644
index 0000000000000..4877dfa13ea80
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBasePartition.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.log4j.Logger
+import org.apache.spark.Partition
+import org.apache.spark.sql.hbase._
+
+/**
+ * HBasePartition
+ * Created by sboesch on 9/9/14.
+ */
+case class HBasePartitionBounds(start : Option[HBaseRawType], end: Option[HBaseRawType]) {
+
+ def contains(rowKey: Optionable[HBaseRawType]) = {
+ import DataTypeUtils.cmp
+ !rowKey.opt.isEmpty && cmp(rowKey.opt, start) >= 0 && cmp(rowKey.opt, end) <= 0
+ }
+}
+
+case class HBasePartition(idx : Int, bounds : HBasePartitionBounds,
+ server: Option[String]) extends Partition {
+
+ /**
+ * Get the split's index within its parent RDD
+ */
+ override def index: Int = idx
+
+}
+object HBasePartition {
+ val SinglePartition = new HBasePartition(1, HBasePartitionBounds(None, None), None)
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseRelation.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseRelation.scala
new file mode 100644
index 0000000000000..78bac4033a3f0
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseRelation.scala
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import java.util
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.{Row => HRow, _}
+import org.apache.hadoop.hbase.filter.{FilterBase, FilterList}
+import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
+import org.apache.log4j.Logger
+import org.apache.spark.Partition
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.hbase.DataTypeUtils._
+import org.apache.spark.sql.hbase.HBaseCatalog._
+import org.apache.spark.sql.{SchemaRDD, StructType}
+
+import scala.collection.SortedMap
+import scala.collection.immutable.TreeMap
+
+/**
+ * HBaseRelation
+ *
+ * Created by stephen.boesch@huawei.com on 9/8/14
+ */
+private[hbase] case class HBaseRelation(
+ @transient var configuration: Configuration,
+ @transient var hbaseContext: HBaseSQLContext,
+ catalogTable: HBaseCatalogTable)
+ extends LeafNode {
+
+ self: Product =>
+
+ import org.apache.spark.sql.hbase.HBaseRelation._
+
+ // TODO: use external resource or HConnectionManager.createConnection
+ @transient lazy val handle: HTable = {
+ val tab = new HTable(configuration, getTableName)
+ tab
+ }
+
+ def getHTable() = handle
+
+ def closeHTable() = {
+ def close = handle.close
+ }
+
+ def isPartitioned = true
+
+ def tableName = getTableName
+
+ def getTableName() = {
+ catalogTable.hbaseTableName.tableName.getNameAsString
+ }
+
+ def buildFilter(rowKeyPredicates: Seq[Expression],
+ colPredicates: Seq[Expression]) = {
+ var colFilters: Option[FilterList] = None
+ if (HBaseStrategies.PushDownPredicates) {
+ // Now process the projection predicates
+ // TODO: rewrite the predicates based on Catalyst Expressions
+
+ // TODO: Do column pruning based on only the required colFamilies
+ val filters: HBaseSQLFilters = new HBaseSQLFilters(colFamilies,
+ rowKeyPredicates, colPredicates)
+ val colFilters = filters.createColumnFilters
+
+ // TODO: Perform Partition pruning based on the rowKeyPredicates
+
+ }
+ }
+
+ val applyFilters = false
+
+ def getScanner(split: Partition): Scan = {
+ val hbPartition = split.asInstanceOf[HBasePartition]
+ val scan = if (applyFilters) {
+ new Scan(hbPartition.bounds.start.get,
+ hbPartition.bounds.end.get)
+ } else {
+ new Scan
+ }
+ if (applyFilters) {
+ colFamilies.foreach { cf =>
+ scan.addFamily(s2b(cf))
+ }
+ }
+ scan
+ }
+
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ lazy val partitionKeys: Seq[Attribute] = catalogTable.rowKey.asAttributes
+
+ lazy val attributes = catalogTable.columns.asAttributes
+
+ lazy val colFamilies = catalogTable.colFamilies
+
+ @transient lazy val rowKeyParser = HBaseRelation.RowKeyParser
+
+ def buildPut(schema: StructType, row: Row): Put = {
+ val ctab = catalogTable
+ val rkey = rowKeyParser.createKeyFromCatalystRow(schema, ctab.rowKey, row)
+ val p = new Put(rkey)
+ DataTypeUtils.catalystRowToHBaseRawVals(schema, row, ctab.columns).zip(ctab.columns.columns)
+ .map { case (raw, col) => p.add(s2b(col.family), s2b(col.qualifier), raw)
+ }
+ p
+ }
+
+ @transient lazy val connection = getHBaseConnection(configuration)
+
+ lazy val hbPartitions = HBaseRelation
+ .getPartitions(catalogTable.hbaseTableName.tableName, configuration).toArray
+
+ def getPartitions(): Array[Partition] = hbPartitions.asInstanceOf[Array[Partition]]
+
+ override def output: Seq[Attribute] = attributes ++ partitionKeys
+
+
+ def buildFilters(rowKeyPredicates: Seq[Expression], colPredicates: Seq[Expression])
+ : HBaseSQLFilters = {
+ new HBaseSQLFilters(colFamilies, rowKeyPredicates, colPredicates)
+ }
+
+ def getRowPrefixPredicates(predicates: Seq[Expression]) = {
+
+ // Filter out all predicates that only deal with partition keys, these are given to the
+ // hive table scan operator to be used for partition pruning.
+ val partitionKeys = catalogTable.rowKey.asAttributes()
+
+ val partitionKeyIds = AttributeSet(partitionKeys)
+ var (rowKeyPredicates, _ /*otherPredicates*/ ) = predicates.partition {
+ _.references.subsetOf(partitionKeyIds)
+ }
+
+ // Find and sort all of the rowKey dimension elements and stop as soon as one of the
+ // composite elements is not found in any predicate
+ val loopx = new AtomicLong
+ val foundx = new AtomicLong
+ val rowPrefixPredicates = for {pki <- partitionKeyIds
+ if ((loopx.incrementAndGet >= 0)
+ && rowKeyPredicates.flatMap {
+ _.references
+ }.contains(pki)
+ && (foundx.incrementAndGet == loopx.get))
+ attrib <- rowKeyPredicates.filter {
+ _.references.contains(pki)
+ }
+ } yield attrib
+ rowPrefixPredicates
+ }
+
+
+ def isOnlyBinaryComparisonPredicates(predicates: Seq[Expression]) = {
+ predicates.forall(_.isInstanceOf[BinaryPredicate])
+ }
+
+ class HBaseSQLFilters(colFamilies: Seq[String],
+ rowKeyPreds: Seq[Expression],
+ opreds: Seq[Expression])
+ extends FilterBase {
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ def createColumnFilters(): Option[FilterList] = {
+ val colFilters: FilterList =
+ new FilterList(FilterList.Operator.MUST_PASS_ALL)
+ // colFilters.addFilter(new HBaseRowFilter(colFamilies,
+ // catalogTable.rowKeyColumns.columns,
+ // rowKeyPreds.orNull))
+ opreds.foreach {
+ case preds: Seq[Expression] =>
+ // TODO; re-do the predicates logic using expressions
+ // new SingleColumnValueFilter(s2b(col.colName.family.get),
+ // colFilters.addFilter(f)
+ // }
+ colFilters
+ }
+ Some(colFilters)
+ }
+ }
+
+ /**
+ * Presently only a sequence of AND predicates supported. TODO(sboesch): support simple tree
+ * of AND/OR predicates
+ */
+ class HBaseRowFilter(colFamilies: Seq[String],
+ rkCols: Seq[Column],
+ rowKeyPreds: Seq[Expression]
+ ) extends FilterBase {
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ override def filterRowKey(rowKey: Array[Byte], offset: Int, length: Int): Boolean = {
+
+ if (!isOnlyBinaryComparisonPredicates(rowKeyPreds)) {
+ false // Presently only simple binary comparisons supported
+ } else {
+ val catColumns: Columns = catalogTable.columns
+ val keyColumns: Columns = catalogTable.rowKey
+ def catalystToHBaseColumnName(catColName: String) = {
+ catColumns.findBySqlName(catColName)
+ }
+
+ def getName(expression: NamedExpression) = expression.asInstanceOf[NamedExpression].name
+
+ val rowPrefixPreds = getRowPrefixPredicates(rowKeyPreds
+ .asInstanceOf[Seq[BinaryExpression]])
+ // TODO: fix sorting of rowprefix preds
+ val rowKeyColsMap = rowKeyParser.parseRowKeyWithMetaData(rkCols, rowKey)
+ val result = rowKeyPreds.forall { p =>
+ p.eval(Row(rowKeyColsMap.values.map {
+ _._2
+ })).asInstanceOf[Boolean]
+ }
+ result
+ }
+ }
+
+ override def isFamilyEssential(name: Array[Byte]): Boolean = {
+ colFamilies.contains(new String(name, HBaseByteEncoding).toLowerCase())
+ }
+
+ def rowKeyOrdinal(name: ColumnName) = catalogTable.rowKey(name).ordinal
+
+ }
+
+}
+
+object HBaseRelation {
+ @transient private lazy val lazyConfig = HBaseConfiguration.create()
+
+ def configuration() = lazyConfig
+
+ def getHBaseConnection(configuration: Configuration) = {
+ val connection = HConnectionManager.createConnection(configuration)
+ connection
+ }
+
+ def getPartitions(tableName: TableName,
+ config: Configuration) = {
+ import scala.collection.JavaConverters._
+ val hConnection = getHBaseConnection(config)
+ val regionLocations = hConnection.locateRegions(tableName)
+ case class BoundsAndServers(startKey: HBaseRawType, endKey: HBaseRawType,
+ servers: Seq[String])
+ val regionBoundsAndServers = regionLocations.asScala.map { hregionLocation =>
+ val regionInfo = hregionLocation.getRegionInfo
+ BoundsAndServers(regionInfo.getStartKey, regionInfo.getEndKey,
+ Seq(hregionLocation.getServerName.getHostname))
+ }
+ val partSeq = regionBoundsAndServers.zipWithIndex.map { case (rb, ix) =>
+ new HBasePartition(ix, HBasePartitionBounds(Some(rb.startKey), Some(rb.endKey)),
+ Some(rb.servers(0)))
+ }
+ partSeq.toIndexedSeq
+ }
+
+ def rowKeysFromRows(schemaRdd: SchemaRDD, relation: HBaseRelation) = {
+ assert(schemaRdd != null)
+ assert(relation != null)
+ assert(relation.rowKeyParser != null)
+ schemaRdd.map { r: Row =>
+ relation.rowKeyParser.createKeyFromCatalystRow(
+ schemaRdd.schema,
+ relation.catalogTable.rowKeyColumns,
+ r)
+ }
+ }
+
+
+ /**
+ * Trait for RowKeyParser's that convert a raw array of bytes into their constituent
+ * logical column values
+ *
+ */
+ trait AbstractRowKeyParser {
+
+ def createKey(rawBytes: HBaseRawRowSeq, version: Byte): HBaseRawType
+
+ def parseRowKey(rowKey: HBaseRawType): HBaseRawRowSeq // .NavigableMap[String, HBaseRawType]
+
+ def parseRowKeyWithMetaData(rkCols: Seq[Column], rowKey: HBaseRawType)
+ : SortedMap[ColumnName, (Column, Any)]
+ }
+
+ case class RowKeySpec(offsets: Seq[Int], version: Byte = RowKeyParser.Version1)
+
+ // TODO(Bo): replace the implementation with the null-byte terminated string logic
+ object RowKeyParser extends AbstractRowKeyParser with Serializable {
+
+
+ val Version1 = 1.toByte
+
+ val VersionFieldLen = 1
+ // Length in bytes of the RowKey version field
+ val DimensionCountLen = 1
+ // One byte for the number of key dimensions
+ val MaxDimensions = 255
+ val OffsetFieldLen = 2
+
+ // Two bytes for the value of each dimension offset.
+ // Therefore max size of rowkey is 65535. Note: if longer rowkeys desired in future
+ // then simply define a new RowKey version to support it. Otherwise would be wasteful
+ // to define as 4 bytes now.
+ def computeLength(keys: HBaseRawRowSeq) = {
+ VersionFieldLen + keys.map {
+ _.length
+ }.sum + OffsetFieldLen * keys.size + DimensionCountLen
+ }
+
+ override def createKey(keys: HBaseRawRowSeq, version: Byte = Version1): HBaseRawType = {
+ var barr = new Array[Byte](computeLength(keys))
+ val arrayx = new AtomicInteger(0)
+ barr(arrayx.getAndAdd(VersionFieldLen)) = version // VersionByte
+
+ // Remember the starting offset of first data value
+ val valuesStartIndex = new AtomicInteger(arrayx.get)
+
+ // copy each of the dimension values in turn
+ keys.foreach { k => copyToArr(barr, k, arrayx.getAndAdd(k.length))}
+
+ // Copy the offsets of each dim value
+ // The valuesStartIndex is the location of the first data value and thus the first
+ // value included in the Offsets sequence
+ keys.foreach { k =>
+ copyToArr(barr,
+ short2b(valuesStartIndex.getAndAdd(k.length).toShort),
+ arrayx.getAndAdd(OffsetFieldLen))
+ }
+ barr(arrayx.get) = keys.length.toByte // DimensionCountByte
+ barr
+ }
+
+ def copyToArr[T](a: Array[T], b: Array[T], aoffset: Int) = {
+ b.copyToArray(a, aoffset)
+ }
+
+ def short2b(sh: Short): Array[Byte] = {
+ val barr = Array.ofDim[Byte](2)
+ barr(0) = ((sh >> 8) & 0xff).toByte
+ barr(1) = (sh & 0xff).toByte
+ barr
+ }
+
+ def b2Short(barr: Array[Byte]) = {
+ val out = (barr(0).toShort << 8) | barr(1).toShort
+ out
+ }
+
+ def createKeyFromCatalystRow(schema: StructType, keyCols: Columns, row: Row) = {
+ val rawKeyCols = DataTypeUtils.catalystRowToHBaseRawVals(schema, row, keyCols)
+ createKey(rawKeyCols)
+ }
+
+ def getMinimumRowKeyLength = VersionFieldLen + DimensionCountLen
+
+ override def parseRowKey(rowKey: HBaseRawType): HBaseRawRowSeq = {
+
+ assert(rowKey.length >= getMinimumRowKeyLength,
+ s"RowKey is invalid format - less than minlen . Actual length=${rowKey.length}")
+ assert(rowKey(0) == Version1, s"Only Version1 supported. Actual=${rowKey(0)}")
+ val ndims: Int = rowKey(rowKey.length - 1).toInt
+ val offsetsStart = rowKey.length - DimensionCountLen - ndims * OffsetFieldLen
+ val rowKeySpec = RowKeySpec(
+ for (dx <- 0 to ndims - 1)
+ yield b2Short(rowKey.slice(offsetsStart + dx * OffsetFieldLen,
+ offsetsStart + (dx + 1) * OffsetFieldLen))
+ )
+
+ val endOffsets = rowKeySpec.offsets.tail :+ (rowKey.length - DimensionCountLen - 1)
+ val colsList = rowKeySpec.offsets.zipWithIndex.map { case (off, ix) =>
+ rowKey.slice(off, endOffsets(ix))
+ }
+ colsList
+ }
+
+ override def parseRowKeyWithMetaData(rkCols: Seq[Column], rowKey: HBaseRawType):
+ SortedMap[ColumnName, (Column, Any)] = {
+ import scala.collection.mutable.HashMap
+
+ val rowKeyVals = parseRowKey(rowKey)
+ val rmap = rowKeyVals.zipWithIndex.foldLeft(new HashMap[ColumnName, (Column, Any)]()) {
+ case (m, (cval, ix)) =>
+ m.update(rkCols(ix).toColumnName, (rkCols(ix),
+ hbaseFieldToRowField(cval, rkCols(ix).dataType)))
+ m
+ }
+ TreeMap(rmap.toArray: _*)(Ordering.by { cn: ColumnName => rmap(cn)._1.ordinal})
+ .asInstanceOf[SortedMap[ColumnName, (Column, Any)]]
+ }
+
+ def show(bytes: Array[Byte]) = {
+ val len = bytes.length
+ val out = s"Version=${bytes(0).toInt} NumDims=${bytes(len - 1)} "
+ }
+
+ }
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLContext.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLContext.scala
new file mode 100644
index 0000000000000..385b66c879b47
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLContext.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase._
+import org.apache.spark.SparkContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.hbase.HBaseCatalog.{Column, Columns, KeyColumn}
+
+/**
+ * An instance of the Spark SQL execution engine that integrates with data stored in Hive.
+ * Configuration for Hive is read from hive-site.xml on the classpath.
+ */
+class HBaseSQLContext(@transient val sc: SparkContext, @transient val hbaseConf: Configuration
+ = HBaseConfiguration.create())
+ extends SQLContext(sc) with Serializable {
+ self =>
+
+ @transient val configuration = hbaseConf
+
+ @transient
+ override protected[sql] lazy val catalog: HBaseCatalog = new HBaseCatalog(this, configuration)
+
+ @transient val hBasePlanner = new SparkPlanner with HBaseStrategies {
+
+ val hbaseContext = self
+ SparkPlan.currentContext.set(self)
+
+ override val strategies: Seq[Strategy] = Seq(
+ CommandStrategy(self),
+ TakeOrdered,
+ InMemoryScans,
+ HBaseTableScans,
+ HashAggregation,
+ LeftSemiJoin,
+ HashJoin,
+ BasicOperators,
+ CartesianProduct,
+ BroadcastNestedLoopJoin,
+ HBaseOperations
+ )
+ }
+
+ @transient
+ override protected[sql] val planner = hBasePlanner
+
+ override private[spark] val dialect: String = "hbaseql"
+
+ override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
+ new this.QueryExecution {
+ val logical = plan
+ }
+
+ /** Extends QueryExecution with HBase specific features. */
+ protected[sql] abstract class QueryExecution extends super.QueryExecution {
+ }
+
+ @transient
+ override protected[sql] val parser = new HBaseSQLParser
+
+ override def parseSql(sql: String): LogicalPlan = parser(sql)
+
+ override def sql(sqlText: String): SchemaRDD = {
+ if (dialect == "sql") {
+ super.sql(sqlText)
+ } else if (dialect == "hbaseql") {
+ new SchemaRDD(this, parser(sqlText))
+ } else {
+ sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hbaseql'")
+ }
+ }
+
+ override lazy val analyzer = new HBaseAnalyzer(catalog,
+ functionRegistry, true) {
+ }
+
+ def createHbaseTable(nameSpace: String,
+ tableName: String,
+ hbaseTable: String,
+ keyCols: Seq[(String, String)],
+ nonKeyCols: Seq[(String, String, String, String)]): Unit = {
+ val keyColumns = keyCols.map { case (name, typeOfData) =>
+ KeyColumn(name, catalog.getDataType(typeOfData.toLowerCase))
+ }
+ val nonKeyColumns = new Columns(nonKeyCols.map {
+ case (name, typeOfData, family, qualifier) =>
+ Column(name, family, qualifier, catalog.getDataType(typeOfData))
+ })
+
+ catalog.createTable(nameSpace, tableName, hbaseTable, keyColumns, nonKeyColumns)
+ }
+
+ def dropHbaseTable(tableName: String): Unit = {
+ catalog.deleteTable(tableName)
+ }
+
+}
+
+object HBaseSQLContext {
+ def createConfigurationFromSerializedFields(serializedProps: Array[Byte]) = {
+ val conf = HBaseConfiguration.create
+ val bis = new ByteArrayInputStream(serializedProps)
+ conf.readFields(new DataInputStream(bis))
+ conf
+ }
+
+ def serializeConfiguration(configuration: Configuration) = {
+ val bos = new ByteArrayOutputStream
+ val props = configuration.write(new DataOutputStream(bos))
+ bos.toByteArray
+ }
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLFilter.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLFilter.scala
new file mode 100644
index 0000000000000..10630855b7066
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLFilter.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import java.util
+
+import org.apache.hadoop.hbase.Cell
+import org.apache.hadoop.hbase.client.Scan
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode
+import org.apache.hadoop.hbase.filter._
+import org.apache.log4j.Logger
+import DataTypeUtils._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.hbase.HBaseCatalog.Column
+
+/**
+ * HBaseSQLFilter: a set of PushDown filters for optimizing Column Pruning
+ * and Row Filtering by using HBase Scan/Filter constructs
+ *
+ * Created by sboesch on 9/22/14.
+ */
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLParser.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLParser.scala
new file mode 100644
index 0000000000000..f583cd3b93cc2
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLParser.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+
+class HBaseSQLParser extends SqlParser {
+ protected val BULK = Keyword("BULK")
+ protected val CREATE = Keyword("CREATE")
+ protected val DROP = Keyword("DROP")
+ protected val ALTER = Keyword("ALTER")
+ protected val EXISTS = Keyword("EXISTS")
+ protected val MAPPED = Keyword("MAPPED")
+ protected val ADD = Keyword("ADD")
+ protected val KEYS = Keyword("KEYS")
+ protected val COLS = Keyword("COLS")
+ protected val BYTE = Keyword("BYTE")
+ protected val SHORT = Keyword("SHORT")
+ protected val INTEGER = Keyword("INTEGER")
+ protected val LONG = Keyword("LONG")
+ protected val FLOAT = Keyword("FLOAT")
+ protected val DOUBLE = Keyword("DOUBLE")
+ protected val BOOLEAN = Keyword("BOOLEAN")
+
+ protected val newReservedWords: Seq[String] =
+ this.getClass
+ .getMethods
+ .filter(_.getReturnType == classOf[Keyword])
+ .map(_.invoke(this).asInstanceOf[Keyword].str)
+
+ override val lexical = new SqlLexical(newReservedWords)
+
+ override protected lazy val query: Parser[LogicalPlan] = (
+ select * (
+ UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2)} |
+ INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2)} |
+ EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
+ UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))}
+ )
+ | insert | cache | create | drop | alter
+ )
+
+ protected lazy val create: Parser[LogicalPlan] =
+ CREATE ~> TABLE ~> ident ~
+ ("(" ~> tableCols <~ ")") ~
+ (MAPPED ~> BY ~> "(" ~> opt(nameSpace)) ~
+ (ident <~ ",") ~
+ (KEYS ~> "=" ~> "[" ~> keys <~ "]" <~ ",") ~
+ (COLS ~> "=" ~> "[" ~> expressions <~ "]" <~ ")") <~ opt(";") ^^ {
+
+ case tableName ~ tableColumns ~ tableNameSpace ~ hbaseTableName ~ keySeq ~ mappingInfo =>
+ //Since the lexical can not recognize the symbol "=" as we expected,
+ //we compose it to expression first and then translate it into Map[String, (String, String)]
+ //TODO: Now get the info by hacking, need to change it into normal way if possible
+ val infoMap: Map[String, (String, String)] =
+ mappingInfo.map { case EqualTo(e1, e2) =>
+ val info = e2.toString.substring(1).split('.')
+ if (info.length != 2) throw new Exception("\nSyntx Error of Create Table")
+ e1.toString.substring(1) ->(info(0), info(1))
+ }.toMap
+
+ val tableColSet = tableColumns.unzip._1.toSet
+ val keySet = keySeq.toSet
+ if (tableColSet.size != tableColumns.length ||
+ keySet.size != keySeq.length ||
+ !(keySet union infoMap.keySet).equals(tableColSet)) {
+ throw new Exception("\nSyntx Error of Create Table")
+ }
+
+ val customizedNameSpace = tableNameSpace.getOrElse("")
+ val partitionResultOfTableColumns = tableColumns.partition {
+ case (name, _) =>
+ keySeq.contains(name)
+ }
+ val keyColDataTypes = keySeq.toList.map{ orderedKeyCol =>
+ partitionResultOfTableColumns._1.find{ allCol =>
+ allCol._1 == orderedKeyCol
+ }.get._2
+ }
+ val keyColsWithDataTypes = keySeq.zip(keyColDataTypes)
+// zip(partitionResultOfTableColumns._1.map{_._2})
+ val nonKeyCols = partitionResultOfTableColumns._2.map {
+ case (name, typeOfData) =>
+ val infoElem = infoMap.get(name).get
+ (name, typeOfData, infoElem._1, infoElem._2)
+ }
+ CreateHBaseTablePlan(tableName, customizedNameSpace, hbaseTableName,
+ keyColsWithDataTypes, nonKeyCols)
+ }
+
+ protected lazy val drop: Parser[LogicalPlan] =
+ DROP ~> TABLE ~> ident <~ opt(";") ^^ {
+ case tableName => DropTablePlan(tableName)
+ }
+
+ protected lazy val alter: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> ident ~ DROP ~ ident <~ opt(";") ^^ {
+ case tn ~ op ~ col => null
+ } | ALTER ~> TABLE ~> ident ~ ADD ~ tableCol ~ (MAPPED ~> BY ~> "(" ~> expressions <~ ")") ^^ {
+ case tn ~ op ~ tc ~ cf => null
+ }
+
+ protected lazy val tableCol: Parser[(String, String)] =
+ ident ~ (STRING | BYTE | SHORT | INTEGER | LONG | FLOAT | DOUBLE | BOOLEAN) ^^ {
+ case e1 ~ e2 => (e1, e2)
+ }
+
+ protected lazy val nameSpace: Parser[String] = ident <~ "."
+
+ protected lazy val tableCols: Parser[Seq[(String, String)]] = repsep(tableCol, ",")
+
+ protected lazy val keys: Parser[Seq[String]] = repsep(ident, ",")
+
+ protected lazy val expressions: Parser[Seq[Expression]] = repsep(expression, ",")
+
+}
+
+case class CreateHBaseTablePlan(tableName: String,
+ nameSpace: String,
+ hbaseTable: String,
+ keyCols: Seq[(String, String)],
+ nonKeyCols: Seq[(String, String, String, String)]
+ ) extends Command
+
+case class DropTablePlan(tableName: String) extends Command
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLReaderRDD.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLReaderRDD.scala
new file mode 100644
index 0000000000000..b14d590d69d8d
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLReaderRDD.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.log4j.Logger
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.{Partition, TaskContext}
+
+import scala.collection.mutable
+
+/**
+ * HBaseSQLReaderRDD
+ * Created by sboesch on 9/16/14.
+ */
+class HBaseSQLReaderRDD(relation: HBaseRelation,
+ projList: Seq[NamedExpression],
+ columnPruningPred: Seq[Expression],
+ rowKeyFilterPred: Seq[Expression],
+ partitionPred: Seq[Expression],
+ coprocSubPlan: Option[SparkPlan],
+ @transient hbaseContext: HBaseSQLContext)
+ extends RDD[Row](hbaseContext.sparkContext, Nil) {
+
+
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ // The SerializedContext will contain the necessary instructions
+ // for all Workers to know how to connect to HBase
+ // For now just hardcode the Config/connection logic
+ @transient lazy val configuration = relation.configuration
+ @transient lazy val connection = relation.connection
+
+ override def getPartitions: Array[Partition] = relation.getPartitions()
+
+ /**
+ * Optionally overridden by subclasses to specify placement preferences.
+ */
+ override protected def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[HBasePartition].server.map {
+ identity
+ }.toSeq
+ }
+
+ val applyFilters: Boolean = false
+ val serializedConfig = HBaseSQLContext.serializeConfiguration(configuration)
+
+ override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+
+ relation.configuration = HBaseSQLContext
+ .createConfigurationFromSerializedFields(serializedConfig)
+
+ val scan = relation.getScanner(split)
+ if (applyFilters) {
+ val colFilters = relation.buildFilters(rowKeyFilterPred, columnPruningPred)
+ }
+
+ @transient val htable = relation.getHTable()
+ @transient val scanner = htable.getScanner(scan)
+ new Iterator[Row] {
+
+ import scala.collection.mutable
+
+ val map = new mutable.HashMap[String, HBaseRawType]()
+
+ var onextVal: Row = _
+
+ def nextRow(): Row = {
+ val result = scanner.next
+ if (result != null) {
+ onextVal = toRow(result, projList)
+ onextVal
+ } else {
+ null
+ }
+ }
+
+ val ix = new java.util.concurrent.atomic.AtomicInteger()
+
+ override def hasNext: Boolean = {
+ if (onextVal != null) {
+ true
+ } else {
+ nextRow() != null
+ }
+ }
+
+ override def next(): Row = {
+ if (onextVal != null) {
+ val tmp = onextVal
+ onextVal = null
+ tmp
+ } else {
+ nextRow
+ }
+ }
+ }
+ }
+
+ def toRow(result: Result, projList: Seq[NamedExpression]): Row = {
+ // TODO(sboesch): analyze if can be multiple Cells in the result
+ // Also, consider if we should go lower level to the cellScanner()
+ val row = result.getRow
+ val rkCols = relation.catalogTable.rowKeyColumns
+ val rowKeyMap = relation.rowKeyParser.parseRowKeyWithMetaData(rkCols.columns, row)
+ var rmap = new mutable.HashMap[String, Any]()
+
+ rkCols.columns.foreach { rkcol =>
+ rmap.update(rkcol.qualifier, rowKeyMap(rkcol.toColumnName))
+ }
+
+ val jmap = new java.util.TreeMap[Array[Byte], Array[Byte]](Bytes.BYTES_COMPARATOR)
+ // rmap.foreach { case (k, v) =>
+ // jmap.put(s2b(k), CatalystToHBase.toByteus(v))
+ // }
+ val vmap = result.getNoVersionMap
+ vmap.put(s2b(""), jmap)
+ val rowArr = projList.zipWithIndex.
+ foldLeft(new Array[Any](projList.size)) {
+ case (arr, (cname, ix)) =>
+ if (rmap.get(cname.name) isDefined) {
+ arr(ix) = rmap.get(cname.name).get.asInstanceOf[Tuple2[_, _]]._2
+ } else {
+ val col = relation.catalogTable.columns.findBySqlName(projList(ix).name).getOrElse {
+ throw new IllegalArgumentException(s"Column ${projList(ix).name} not found")
+ }
+ val dataType = col.dataType
+ val qual = s2b(col.qualifier)
+ val fam = s2b(col.family)
+ arr(ix) = DataTypeUtils.hbaseFieldToRowField(
+ vmap.get(fam).get(qual)
+ , dataType)
+ }
+ arr
+ }
+ Row(rowArr: _*)
+ }
+
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLTableScan.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLTableScan.scala
new file mode 100644
index 0000000000000..bce13b5343327
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseSQLTableScan.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
+
+/**
+ * HBaseTableScan
+ * Created by sboesch on 9/2/14.
+ */
+case class HBaseSQLTableScan(
+ otherAttributes: Seq[Attribute],
+ attributes: Seq[Attribute],
+ relation: HBaseRelation,
+ projList: Seq[NamedExpression],
+ columnPruningPredicates: Seq[Expression],
+ rowKeyPredicates: Seq[Expression],
+ partitionPruningPredicates: Seq[Expression],
+ coProcessorPlan: Option[SparkPlan])
+ (@transient context: HBaseSQLContext)
+ extends LeafNode {
+
+ /**
+ * Runs this query returning the result as an RDD.
+ */
+ override def execute(): RDD[Row] = {
+
+ new HBaseSQLReaderRDD(
+ relation,
+ projList,
+ columnPruningPredicates, // TODO:convert to column pruning preds
+ rowKeyPredicates,
+ rowKeyPredicates, // PartitionPred : Option[Expression]
+ None, // coprocSubPlan: SparkPlan
+ context
+ )
+ }
+
+ override def output = attributes
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseStrategies.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseStrategies.scala
new file mode 100644
index 0000000000000..a8e13731dc40f
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseStrategies.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.HTable
+import org.apache.hadoop.hbase.filter.{Filter => HFilter}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.{SQLContext, SchemaRDD}
+
+/**
+ * HBaseStrategies
+ * Created by sboesch on 8/22/14.
+ */
+private[hbase] trait HBaseStrategies extends QueryPlanner[SparkPlan] {
+ self: SQLContext#SparkPlanner =>
+
+ import org.apache.spark.sql.hbase.HBaseStrategies._
+
+ val hbaseContext: HBaseSQLContext
+
+
+ /**
+ * Retrieves data using a HBaseTableScan. Partition pruning predicates are also detected and
+ * applied.
+ */
+ object HBaseTableScans extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case PhysicalOperation(projectList, inPredicates, relation: HBaseRelation) =>
+
+ // Filter out all predicates that only deal with partition keys
+ val partitionsKeys = AttributeSet(relation.partitionKeys)
+ val (rowKeyPredicates, otherPredicates) = inPredicates.partition {
+ _.references.subsetOf(partitionsKeys)
+ }
+
+ // TODO: Ensure the outputs from the relation match the expected columns of the query
+
+ val predAttributes = AttributeSet(inPredicates.flatMap(_.references))
+ val projectSet = AttributeSet(projectList.flatMap(_.references))
+
+ val attributes = projectSet ++ predAttributes
+
+ val rowPrefixPredicates = relation.getRowPrefixPredicates(rowKeyPredicates)
+
+ // partitionRowKeyPredicates.flatMap { partitionSpecificRowKeyPredicates =>
+ def projectionToHBaseColumn(expr: NamedExpression,
+ hbaseRelation: HBaseRelation): ColumnName = {
+ hbaseRelation.catalogTable.allColumns.findBySqlName(expr.name).map(_.toColumnName).get
+ }
+
+ val rowKeyPreds: Seq[Expression] = if (!rowPrefixPredicates.isEmpty) {
+ Seq(rowPrefixPredicates.reduceLeft(And))
+ } else {
+ Nil
+ }
+
+ val scanBuilder: (Seq[Attribute] => SparkPlan) = HBaseSQLTableScan(
+ _, // TODO: this first parameter is not used but can not compile without it
+ attributes.map {
+ _.toAttribute
+ }.toSeq,
+ relation,
+ projectList,
+ otherPredicates,
+ rowKeyPreds,
+ rowKeyPreds,
+ None // coprocSubPlan
+ )(hbaseContext)
+
+ pruneFilterProject(
+ projectList,
+ inPredicates,
+ identity[Seq[Expression]], // removeRowKeyPredicates,
+ scanBuilder) :: Nil
+
+ case _ =>
+ Nil
+ }
+ }
+
+ def getHTable(conf: Configuration, tname: String) = {
+ val htable = new HTable(conf, tname)
+ htable
+ }
+
+ def sparkFilterProjectJoinToHBaseScan(sFilter: Filter,
+ sProject: Projection, sJoin: Join) = {
+ // TODO..
+ }
+
+ @inline def assertFromClosure(p: Boolean, msg: String) = {
+ if (!p) {
+ throw new IllegalStateException(s"AssertionError: $msg")
+ }
+ }
+
+ case class InsertIntoHBaseTable(
+ relation: HBaseRelation,
+ child: SparkPlan,
+ overwrite: Boolean = false)
+ (hbContext: HBaseSQLContext)
+ extends UnaryNode {
+ override def execute() = {
+ val childRdd = child.execute().asInstanceOf[SchemaRDD]
+ assertFromClosure(childRdd != null, "InsertIntoHBaseTable: the source RDD failed")
+
+ putToHBase(childRdd, relation, hbContext)
+ childRdd
+ }
+
+ override def output = child.output
+ }
+
+ case class InsertIntoHBaseTableFromRdd(
+ relation: HBaseRelation,
+ childRdd: SchemaRDD,
+ bulk: Boolean = false,
+ overwrite: Boolean = false)
+ (hbContext: HBaseSQLContext)
+ extends UnaryNode {
+ override def execute() = {
+ assert(childRdd != null, "InsertIntoHBaseTable: the child RDD is empty")
+
+ putToHBase(childRdd, relation, hbContext)
+ childRdd
+ }
+
+ override def child: SparkPlan = SparkLogicalPlan(
+ ExistingRdd(childRdd.queryExecution.executedPlan.output, childRdd))(hbContext)
+ .alreadyPlanned
+
+ override def output = child.output
+ }
+
+ object HBaseOperations extends Strategy {
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case CreateHBaseTablePlan(tableName, nameSpace, hbaseTableName, keyCols, nonKeyCols) =>
+ Seq(CreateHBaseTableCommand(tableName, nameSpace, hbaseTableName, keyCols, nonKeyCols)
+ (hbaseContext))
+ case logical.InsertIntoTable(table: HBaseRelation, partition, child, overwrite) =>
+ new InsertIntoHBaseTable(table, planLater(child), overwrite)(hbaseContext) :: Nil
+ case DropTablePlan(tableName) => Seq(DropHbaseTableCommand(tableName)(hbaseContext))
+ case _ => Nil
+ }
+
+ }
+
+}
+
+object HBaseStrategies {
+
+ // TODO: set to true when the logic for PDP has been tested
+ val PushDownPredicates = false
+
+ // WIP
+ def putToHBase(schemaRdd: SchemaRDD,
+ relation: HBaseRelation,
+ @transient hbContext: HBaseSQLContext) {
+
+ val schema = schemaRdd.schema
+ val serializedProps = HBaseSQLContext.serializeConfiguration(hbContext.configuration)
+ schemaRdd.mapPartitions { partition =>
+ if (!partition.isEmpty) {
+ println("we are running the putToHBase..")
+ val configuration = HBaseSQLContext.createConfigurationFromSerializedFields(serializedProps)
+ val tableIf = relation.getHTable
+ partition.map { case row =>
+ val put = relation.buildPut(schema, row)
+ tableIf.put(put)
+ if (!partition.hasNext) {
+ relation.closeHTable
+ }
+ row
+ }
+ } else {
+ new Iterator[(Row, HBaseRawType)]() {
+ override def hasNext: Boolean = false
+
+ override def next(): (Row, HBaseRawType) = null
+ }
+ }
+ }
+ }
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseTable.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseTable.scala
new file mode 100644
index 0000000000000..650987e000c6d
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/HBaseTable.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+/**
+ * HBaseTable
+ * Created by sboesch on 9/16/14.
+ */
+case class HBaseTable(
+ tableName: String,
+ alias: Option[String],
+ rowkeyColumns : Seq[Attribute],
+ columns : Seq[Attribute],
+ partitions: Seq[HBasePartition]
+ ) {
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestHbase.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestHbase.scala
new file mode 100644
index 0000000000000..b6401d5678ec4
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestHbase.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{SQLConf, SQLContext}
+
+/** A SQLContext that can be used for local testing. */
+object TestHbase
+ extends HBaseSQLContext(new SparkContext("local[2]", "TestSQLContext", new SparkConf())) {
+
+ /** Fewer partitions to speed up testing. */
+ override private[spark] def numShufflePartitions: Int =
+ getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestRDD.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestRDD.scala
new file mode 100644
index 0000000000000..f150e761715c5
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestRDD.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import org.apache.log4j.Logger
+import org.apache.spark.{TaskContext, Partition, SparkContext}
+import org.apache.spark.rdd.RDD
+
+class TestRDD(parent : RDD[String], happyFace : String, nPartitions: Int)
+ extends RDD[String](parent) {
+
+ @transient val logger = Logger.getLogger(getClass.getName)
+ val parentDebugString = parent.toDebugString
+
+ def myHappyFace = happyFace
+
+ override def compute(split: Partition, context: TaskContext): Iterator[String]
+ = List(s"My partition is ${split.index} says parent is /* ${parentDebugString}").iterator
+
+ override protected def getPartitions: Array[Partition] = Array.tabulate[Partition](nPartitions){
+ pindex : Int => new Partition() { def index = pindex }}
+}
+
+object TestRdd {
+ def test() = {
+ //val myrdd = sc.parallelize( (0 until 100).map{ n => s"Hi there $n"},2)
+ val NPartitions = 10
+ val sc = null.asInstanceOf[SparkContext]
+ val myrdd = sc.parallelize( (0 until 100).map{ n => s"Hi there $n"}, NPartitions)
+ val myTestRdd = new TestRDD(myrdd,"MyHappyFace", NPartitions)
+
+ import java.io._
+
+ val objFile = "/tmp/rdd.out"
+ val fos = new FileOutputStream(objFile)
+ val oos = new ObjectOutputStream(fos)
+ val mySerializedRdd = oos.writeObject(myTestRdd)
+ val fis = new FileInputStream(objFile)
+ val ois = new ObjectInputStream(fis)
+ val myNewSerializedRdd = ois.readObject
+ val collector = myNewSerializedRdd.asInstanceOf[TestRDD]
+ println(s"Collector class is ${collector.getClass.getName}")
+ println("%s".format(collector.getClass.getMethods.mkString("Methods: [",",","]")))
+ println(s"Collector is ${collector.toDebugString}")
+ println(s"Collect output: ${collector.collect}")
+ myNewSerializedRdd
+ }
+}
+
+//TestRdd.test
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestingSchemaRDD.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestingSchemaRDD.scala
new file mode 100644
index 0000000000000..4f6fc63951002
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/TestingSchemaRDD.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Row, SchemaRDD}
+
+/**
+ * TestingSchemaRDD
+ * Created by sboesch on 10/6/14.
+ */
+class TestingSchemaRDD(@transient sqlContext: HBaseSQLContext,
+ @transient baseLogicalPlan: LogicalPlan)
+ extends SchemaRDD(sqlContext, baseLogicalPlan) {
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ /** A private method for tests, to look at the contents of each partition */
+ override private[spark] def collectPartitions(): Array[Array[Row]] = {
+ sparkContext.runJob(this, (iter: Iterator[Row]) => iter.toArray, partitions.map{_.index},
+ allowLocal=true)
+ }
+
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/hBaseCommands.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/hBaseCommands.scala
new file mode 100644
index 0000000000000..91f46a7594369
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/hBaseCommands.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{Command, LeafNode}
+
+case class CreateHBaseTableCommand(tableName: String,
+ nameSpace: String,
+ hbaseTable: String,
+ keyCols: Seq[(String, String)],
+ nonKeyCols: Seq[(String, String, String, String)])
+ (@transient context: HBaseSQLContext)
+ extends LeafNode with Command {
+
+ override protected[sql] lazy val sideEffectResult = {
+ context.createHbaseTable(nameSpace, tableName, hbaseTable, keyCols, nonKeyCols)
+ Seq.empty[Row]
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
+
+case class DropHbaseTableCommand(tableName: String)
+ (@transient context: HBaseSQLContext)
+ extends LeafNode with Command {
+
+ override protected[sql] lazy val sideEffectResult = {
+ context.dropHbaseTable(tableName)
+ Seq.empty[Row]
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/hbaseColumns.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/hbaseColumns.scala
new file mode 100644
index 0000000000000..a35ac25f81042
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/hbaseColumns.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.sql.DataType
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types.{IntegerType, LongType, StringType}
+
+case class ColumnName(var family: Option[String], qualifier: String) {
+ if (family.isDefined && family.get == null) {
+ family = None
+ }
+
+ override def toString = fullName
+
+ def fullName = if (family.isDefined) {
+ s"$family:$qualifier"
+ } else {
+ s":$qualifier"
+ }
+
+ // override def equals(other: Any) = {
+ // if (!other.isInstanceOf[ColumnName]) {
+ // false
+ // }
+ // val cother = other.asInstanceOf[ColumnName]
+ // family == cother.family && qualifier == cother.qualifier
+ // }
+}
+
+object ColumnName {
+ def apply(compoundStr: String) = {
+ val toks = compoundStr.split(":").toList
+ if (toks.size == 2) {
+ new ColumnName(Some(toks(0)), toks(1))
+ } else {
+ new ColumnName(None, toks(0))
+ }
+ }
+}
diff --git a/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/package.scala b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/package.scala
new file mode 100644
index 0000000000000..c3199c21339ef
--- /dev/null
+++ b/sql/hbase/src/main/scala/org/apache/spark/sql/hbase/old/package.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericMutableRow}
+
+import scala.language.implicitConversions
+
+/**
+ * package
+ * Created by sboesch on 9/22/14.
+ */
+package object hbase {
+
+ type HBaseRawType = Array[Byte]
+ type HBaseRawRow = Array[HBaseRawType]
+ type HBaseRawRowSeq = Seq[HBaseRawType]
+
+ val HBaseByteEncoding = "ISO-8859-1"
+
+ class HBaseRow(vals: HBaseRawRow) extends GenericRow(vals.asInstanceOf[Array[Any]])
+
+ def s2b(str: String) = str.getBytes(HBaseByteEncoding)
+
+ class Optionable[T <: AnyRef](value: T) {
+ @inline def opt: Option[T] = if (value == null) { None } else { Some(value) }
+ }
+
+ implicit def anyRefToOptionable[T <: AnyRef](value: T) = new Optionable(value)
+
+ implicit def hbaseRawTypeComparable(hbaseRaw: HBaseRawType): Comparable[HBaseRawType] = {
+ new Comparable[HBaseRawType]() {
+ override def compareTo(o: HBaseRawType): Int = {
+ DataTypeUtils.cmp(Some(hbaseRaw), Some(o))
+ }
+ }
+ }
+
+ case class SerializableTableName(@transient inTableName: TableName) {
+ val namespace = inTableName.getNamespace
+ val name = inTableName.getQualifier
+ @transient lazy val tableName: TableName = TableName.valueOf(namespace, name)
+ }
+
+ def binarySearchLowerBound[T, U](xs: IndexedSeq[T], key: U, keyExtract:
+ (T) => U = (x: T) => x)(implicit ordering: Ordering[U]): Option[Int] = {
+ var len = xs.length
+ var first = 0
+ var found = false
+ while (!found && len > 0) {
+ val half = len >>> 1
+ val middle = first + half
+ val arrval = keyExtract(xs(middle))
+ if (ordering.eq(arrval, key)) {
+ first = middle
+ found = true
+ } else if (ordering.lt(arrval, key)) {
+ first = middle + 1
+ len = len - half - 1
+ } else {
+ len = half
+ }
+ }
+ if (first < xs.length) {
+ Some(first)
+ } else {
+ None
+ }
+ }
+
+ val MinByteArr = {
+ val barr = new Array[Byte](1)
+ barr(0) = 0.toByte
+ barr
+ }
+ val MaxByteArr = {
+ Array.fill[Byte](512)(0xff.toByte) // Think that's probably long enough..
+ }
+}
diff --git a/sql/hbase/src/test/resources/log4j.properties b/sql/hbase/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000..faf2fb68dbc60
--- /dev/null
+++ b/sql/hbase/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=INFO, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = INFO
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c{1}: %m%n
+log4j.appender.FA.Threshold = INFO
+
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.mortbay=WARN
+
+log4j.logger.BlockStateChange=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.apache.hadoop.hbase.ZNodeClearer=ERROR
\ No newline at end of file
diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/CatalogTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/CatalogTest.scala
new file mode 100644
index 0000000000000..4dc5e3bac44de
--- /dev/null
+++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/CatalogTest.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hbase
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.{HColumnDescriptor, TableName, HTableDescriptor}
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.spark.sql.catalyst.types.{FloatType, BooleanType, IntegerType, StringType}
+import org.apache.spark.sql.hbase.HBaseCatalog.{Column, Columns, KeyColumn}
+import org.apache.spark.{Logging, SparkContext, _}
+import org.scalatest.{Ignore, BeforeAndAfterAll, FunSuite}
+
+/**
+ * Created by mengbo on 10/2/14.
+ */
+@Ignore
+class CatalogTest extends FunSuite with BeforeAndAfterAll with Logging {
+ var sparkConf: SparkConf = _
+ var sparkContext: SparkContext = _
+ var hbaseContext: HBaseSQLContext = _
+ var configuration: Configuration = _
+ var catalog: HBaseCatalog = _
+
+ override def beforeAll() = {
+ sparkConf = new SparkConf().setAppName("Catalog Test").setMaster("local[4]")
+ sparkContext = new SparkContext(sparkConf)
+ hbaseContext = new HBaseSQLContext(sparkContext)
+ configuration = hbaseContext.configuration
+ catalog = new HBaseCatalog(hbaseContext, configuration)
+ }
+
+ test("Create Table") {
+ // prepare the test data
+ val namespace = "testNamespace"
+ val tableName = "testTable"
+ val hbaseTableName = "hbaseTable"
+ val family1 = "family1"
+ val family2 = "family2"
+
+ val admin = new HBaseAdmin(configuration)
+ val desc = new HTableDescriptor(TableName.valueOf(hbaseTableName))
+ desc.addFamily(new HColumnDescriptor(family1))
+ desc.addFamily(new HColumnDescriptor(family2))
+ admin.createTable(desc)
+
+ val keyColumn1 = KeyColumn("column1", StringType)
+ val keyColumn2 = KeyColumn("column2", IntegerType)
+ var keyColumns = List[KeyColumn]()
+ keyColumns = keyColumns :+ keyColumn1
+ keyColumns = keyColumns :+ keyColumn2
+
+ val nonKeyColumn3 = Column("column3", family1, "qualifier1", BooleanType)
+ val nonKeyColumn4 = Column("column4", family2, "qualifier2", FloatType)
+ var nonKeyColumnList = List[Column]()
+ nonKeyColumnList = nonKeyColumnList :+ nonKeyColumn3
+ nonKeyColumnList = nonKeyColumnList :+ nonKeyColumn4
+ val nonKeyColumns = new Columns(nonKeyColumnList)
+
+ catalog.createTable(namespace, tableName, hbaseTableName, keyColumns, nonKeyColumns)
+ }
+
+ test("Get Table") {
+ // prepare the test data
+ val hbaseNamespace = "testNamespace"
+ val tableName = "testTable"
+ val hbaseTableName = "hbaseTable"
+
+ val oresult = catalog.getTable(tableName)
+ assert(oresult.isDefined)
+ val result = oresult.get
+ assert(result.tablename === tableName)
+ assert(result.hbaseTableName.tableName.getNameAsString === hbaseNamespace + ":" + hbaseTableName)
+ assert(result.colFamilies.size === 2)
+ assert(result.columns.columns.size === 2)
+
+ // check the data type
+ assert(result.rowKey.columns(0).dataType === StringType)
+ assert(result.rowKey.columns(1).dataType === IntegerType)
+ assert(result.columns.columns(0).dataType === BooleanType)
+ assert(result.columns.columns(1).dataType === FloatType)
+
+ val relation = catalog.lookupRelation(None, tableName)
+ val hbRelation = relation.asInstanceOf[HBaseRelation]
+ assert(hbRelation.colFamilies == Set("family1", "family2"))
+ assert(hbRelation.partitionKeys == Seq("column1", "column2"))
+ val rkColumns = new Columns(Seq(Column("column1", null, "column1", StringType, 1),
+ Column("column1", null, "column1", IntegerType, 2)))
+ assert(hbRelation.catalogTable.rowKeyColumns.equals(rkColumns))
+ assert(relation.childrenResolved)
+ }
+
+ test("Delete Table") {
+ // prepare the test data
+ val tableName = "testTable"
+
+ catalog.deleteTable(tableName)
+ }
+
+ test("Check Logical Table Exist") {
+ val tableName = "non-exist"
+
+ assert(catalog.checkLogicalTableExist(tableName) === false)
+ }
+}
diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala
new file mode 100644
index 0000000000000..81dad0bc13208
--- /dev/null
+++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseBasicOperationSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.sql.QueryTest
+import org.scalatest.Ignore
+
+//Implicits
+
+import org.apache.spark.sql.hbase.TestHbase._
+
+@Ignore
+class HBaseBasicOperationSuite extends QueryTest {
+ TestData // Initialize TestData
+
+ test("create table") {
+ sql( """CREATE TABLE tableName (col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER,
+ col5 LONG, col6 FLOAT, col7 DOUBLE)
+ MAPPED BY (hbaseTableName, KEYS=[col7, col1, col3], COLS=[col2=cf1.cq11,
+ col4=cf1.cq12, col5=cf2.cq21, col6=cf2.cq22])""".stripMargin
+ )
+ }
+
+ test("Insert Into table") {
+ // sql("""CREATE TABLE t1 (t1c1 STRING, t1c2 STRING)
+ // MAPPED BY (ht1, KEYS=[t1c1], COLS=[t1c2=cf1.cq11])""".stripMargin
+ // )
+ // sql("""CREATE TABLE t2 (t2c1 STRING, t2c2 STRING)
+ // MAPPED BY (ht2, KEYS=[t2c1], COLS=[t2c2=cf2.cq21])""".stripMargin
+ // )
+ sql( """INSERT INTO t1 SELECT * FROM t2""".stripMargin)
+ }
+
+ test("Drop table") {
+ sql( """CREATE TABLE t1 (t1c1 STRING, t1c2 STRING)
+ MAPPED BY (ht1, KEYS=[t1c1], COLS=[t1c2=cf1.cq11])""".stripMargin
+ )
+ sql( """DROP TABLE t1""".stripMargin)
+ }
+
+ test("SPARK-3176 Added Parser of SQL ABS()") {
+ checkAnswer(
+ sql("SELECT ABS(-1.3)"),
+ 1.3)
+ }
+}
diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseIntegrationTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseIntegrationTest.scala
new file mode 100644
index 0000000000000..755e25ef0fd6c
--- /dev/null
+++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseIntegrationTest.scala
@@ -0,0 +1,216 @@
+package org.apache.spark.sql.hbase
+
+import java.sql.Timestamp
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.{Result, Scan, HTable, HBaseAdmin}
+import org.apache.log4j.Logger
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.types.{IntegerType, StringType, LongType}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.sql.{ReflectData, SQLContext, SchemaRDD}
+//import org.apache.spark.sql.hbase.TestHbase._
+import org.apache.spark.{SparkConf, Logging, SparkContext}
+import org.apache.spark.sql.hbase.HBaseCatalog.{KeyColumn, Columns, Column}
+import org.scalatest.{Ignore, BeforeAndAfterAll, BeforeAndAfter, FunSuite}
+import org.apache.hadoop.hbase.{HBaseConfiguration, HBaseTestingUtility, MiniHBaseCluster}
+
+/**
+ * HBaseIntegrationTest
+ * Created by sboesch on 9/27/14.
+ */
+@Ignore
+class HBaseIntegrationTest extends FunSuite with BeforeAndAfterAll with Logging {
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ val NMasters = 1
+ val NRegionServers = 3
+ val NDataNodes = 0
+
+ val NWorkers = 1
+
+ var cluster : MiniHBaseCluster = _
+ var config : Configuration = _
+ var hbaseAdmin : HBaseAdmin = _
+ var hbContext : HBaseSQLContext = _
+ var catalog : HBaseCatalog = _
+ var testUtil :HBaseTestingUtility = _
+
+// @inline def assert(p: Boolean, msg: String) = {
+// if (!p) {
+// throw new IllegalStateException(s"AssertionError: $msg")
+// }
+// }
+
+ override def beforeAll() = {
+ logger.info(s"Spin up hbase minicluster w/ $NMasters mast, $NRegionServers RS, $NDataNodes dataNodes")
+ testUtil = new HBaseTestingUtility
+// cluster = HBaseTestingUtility.createLocalHTU.
+// startMiniCluster(NMasters, NRegionServers, NDataNodes)
+// config = HBaseConfiguration.create
+ config = testUtil.getConfiguration
+ config.set("hbase.regionserver.info.port","-1")
+ config.set("hbase.master.info.port","-1")
+ cluster = testUtil.startMiniCluster(NMasters, NRegionServers)
+ println(s"# of region servers = ${cluster.countServedRegions}")
+ val conf = new SparkConf
+ val SparkPort = 11223
+ conf.set("spark.ui.port",SparkPort.toString)
+ val sc = new SparkContext(s"local[$NWorkers]", "HBaseTestsSparkContext", conf)
+ hbContext = new HBaseSQLContext(sc, config)
+ catalog = hbContext.catalog
+ hbaseAdmin = new HBaseAdmin(config)
+ }
+
+ test("Check the mini cluster for sanity") {
+ assert(cluster.countServedRegions == NRegionServers, "Region Servers incorrect")
+ println(s"# of region servers = ${cluster.countServedRegions}")
+ }
+
+ val DbName = "testdb"
+ val TabName = "testtaba"
+ val HbaseTabName = "hbasetaba"
+
+ test("Create a test table on the server") {
+
+// import hbContext.
+ val columns = new Columns(Array.tabulate[Column](10){ ax =>
+ Column(s"sqlColName$ax",s"cf${ax % 2}",s"cq${ax %2}ax",
+ if (ax % 2 == 0) LongType else StringType)
+ })
+ val keys = Array.tabulate(4){ ax =>
+ KeyColumn(s"sqlColName$ax",
+ if (ax % 2 == 0) LongType else StringType)
+ }.toSeq
+
+ catalog.createTable(DbName, TabName, HbaseTabName, keys, columns)
+
+ val metaTable = new HTable(config, HBaseCatalog.MetaData)
+ val scanner = metaTable.getScanner(new Scan())
+ import collection.mutable
+ var rows = new mutable.ArrayBuffer[Result]()
+ var row : Result = null
+ do {
+ row = scanner.next
+ if (row != null) {
+ rows += row
+ }
+ } while (row!=null)
+ assert(!rows.isEmpty, "Hey where did our metadata row go?")
+ val tname = rows(0).getColumnLatestCell(HBaseCatalog.ColumnFamily,
+ HBaseCatalog.QualKeyColumns)
+// assert(new String(tname.getQualifierArray).contains(HBaseCatalog.QualColumnInfo),
+// "We were unable to read the columnInfo cell")
+ val catTab = catalog.getTable(TabName)
+ assert(catTab.get.tablename == TabName)
+ // TODO(Bo, XinYu): fix parser/Catalog to support Namespace=Dbname
+ assert(catTab.get.hbaseTableName.toString == s"$DbName:$HbaseTabName")
+ }
+
+ test("ReflectData from spark tests suite") {
+ val data = ReflectData("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
+ BigDecimal(1), new Timestamp(12345), Seq(1,2,3))
+ val rdd = sparkContext.parallelize(data :: Nil)
+ rdd.registerTempTable("reflectData")
+
+ assert(sql("SELECT * FROM reflectData").collect().head === data.productIterator.toSeq)
+
+// ctx.sql(
+// s"""insert into $TabName select * from $TempTabName""".stripMargin)
+//
+// ctx.sql(s"""select * from $TabName
+// where col1 >=3 and col1 <= 10
+// order by col1 desc"""
+// .stripMargin)
+
+ }
+
+ test("get table") {
+ // prepare the test data
+ val namespace = "testNamespace"
+ val tableName = "testTable"
+ val hbaseTableName = "hbaseTable"
+
+ val oresult = catalog.getTable(tableName)
+ assert(oresult.isDefined)
+ val result = oresult.get
+ assert(result.tablename == tableName)
+ assert(result.hbaseTableName.tableName.getNameAsString == namespace + ":" + hbaseTableName)
+ assert(result.colFamilies.size === 2)
+ assert(result.columns.columns.size === 2)
+ val relation = catalog.lookupRelation(None, tableName)
+ val hbRelation = relation.asInstanceOf[HBaseRelation]
+ assert(hbRelation.colFamilies == Set("family1", "family2"))
+ assert(hbRelation.partitionKeys == Seq("column1", "column2"))
+ val rkColumns = new Columns(Seq(Column("column1",null, "column1", StringType,1),
+ Column("column1",null, "column1", IntegerType,2)))
+ assert(hbRelation.catalogTable.rowKeyColumns.equals(rkColumns))
+ assert(relation.childrenResolved)
+ }
+
+ case class MyTable(col1: String, col2: Byte, col3: Short, col4: Int, col5: Long,
+ col6: Float, col7: Double)
+
+ test("Insert data into the test table using applySchema") {
+
+ val DbName = "mynamespace"
+ val TabName = "myTable"
+ hbContext.sql(s"""CREATE TABLE $DbName.$TabName(col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER,
+ col5 LONG, col6 FLOAT, col7 DOUBLE)
+ MAPPED BY (hbaseTableName, KEYS=[col7, col1, col3], COLS=[col2=cf1.cq11,
+ col4=cf1.cq12, col5=cf2.cq21, col6=cf2.cq22])"""
+ .stripMargin)
+
+ val catTab = catalog.getTable(TabName)
+ assert(catTab.get.tablename == TabName)
+
+ val ctx = hbContext
+ import ctx.createSchemaRDD
+ val myRows = ctx.sparkContext.parallelize(Range(1,21).map{ix =>
+ MyTable(s"col1$ix", ix.toByte, (ix.toByte*256).asInstanceOf[Short],ix.toByte*65536, ix.toByte*65563L*65536L,
+ (ix.toByte*65536.0).asInstanceOf[Float], ix.toByte*65536.0D*65563.0D)
+ })
+
+// import org.apache.spark.sql.execution.ExistingRdd
+// val myRowsSchema = ExistingRdd.productToRowRdd(myRows)
+// ctx.applySchema(myRowsSchema, schema)
+ val TempTabName = "MyTempTab"
+ myRows.registerTempTable(TempTabName)
+
+ // ctx.sql(
+ // s"""insert into $TabName select * from $TempTabName""".stripMargin)
+
+ val hbRelation = catalog.lookupRelation(Some(DbName), TabName).asInstanceOf[HBaseRelation]
+
+ val hbasePlanner = new SparkPlanner with HBaseStrategies {
+ @transient override val hbaseContext: HBaseSQLContext = hbContext
+ }
+
+ val myRowsSchemaRdd = hbContext.createSchemaRDD(myRows)
+ val insertPlan = hbasePlanner.InsertIntoHBaseTableFromRdd(hbRelation,
+ myRowsSchemaRdd)(hbContext)
+
+ val insertRdd = insertPlan.execute.collect
+
+ ctx.sql( s"""select * from $TabName
+ where col1 >=3 and col1 <= 10
+ order by col1 desc"""
+ .stripMargin)
+
+ }
+
+ test("Run a simple query") {
+ // ensure the catalog exists (created in the "Create a test table" test)
+ val catTab = catalog.getTable(TabName).get
+ assert(catTab.tablename == TabName)
+ val rdd = hbContext.sql(s"select * from $TabName")
+ rdd.take(1)
+
+ }
+
+ override def afterAll() = {
+ cluster.shutdown
+ }
+
+}
diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseMainTest.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseMainTest.scala
new file mode 100644
index 0000000000000..8499827ca0ef6
--- /dev/null
+++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseMainTest.scala
@@ -0,0 +1,422 @@
+package org.apache.spark.sql.hbase
+
+import java.io.{ObjectOutputStream, ByteArrayOutputStream, DataOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.client._
+import org.apache.log4j.Logger
+import org.apache.spark
+import org.apache.spark.sql.SchemaRDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.types.{DoubleType, ShortType, StringType}
+import org.apache.spark.sql.hbase.DataTypeUtils._
+import org.apache.spark.sql.hbase.HBaseCatalog.{Column, Columns}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.{Logging, SparkConf, sql}
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import spark.sql.Row
+
+/**
+ * HBaseIntegrationTest
+ * Created by sboesch on 9/27/14.
+ */
+object HBaseMainTest extends FunSuite with BeforeAndAfterAll with Logging {
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ val useMiniCluster: Boolean = false
+
+ val NMasters = 1
+ val NRegionServers = 1
+ // 3
+ val NDataNodes = 0
+
+ val NWorkers = 1
+
+ @transient var cluster: MiniHBaseCluster = null
+ @transient var config: Configuration = null
+ @transient var hbaseAdmin: HBaseAdmin = null
+ @transient var hbContext: HBaseSQLContext = null
+ @transient var catalog: HBaseCatalog = null
+ @transient var testUtil: HBaseTestingUtility = null
+
+ case class MyTable(col1: String, col2: Byte, col3: Short, col4: Int, col5: Long,
+ col6: Float, col7: Double)
+
+ val DbName = "mynamespace"
+ val TabName = "myTable"
+ val HbaseTabName = "hbasetaba"
+
+ def ctxSetup() {
+ if (useMiniCluster) {
+ logger.info(s"Spin up hbase minicluster w/ $NMasters mast, $NRegionServers RS, $NDataNodes dataNodes")
+ testUtil = new HBaseTestingUtility
+ config = testUtil.getConfiguration
+ } else {
+ config = HBaseConfiguration.create
+ }
+ // cluster = HBaseTestingUtility.createLocalHTU.
+ // startMiniCluster(NMasters, NRegionServers, NDataNodes)
+ // config = HBaseConfiguration.create
+ config.set("hbase.regionserver.info.port", "-1")
+ config.set("hbase.master.info.port", "-1")
+ config.set("dfs.client.socket-timeout", "240000")
+ config.set("dfs.datanode.socket.write.timeout", "240000")
+ config.set("zookeeper.session.timeout", "240000")
+ config.set("zookeeper.minSessionTimeout", "10")
+ config.set("zookeeper.tickTime", "10")
+ config.set("hbase.rpc.timeout", "240000")
+ config.set("ipc.client.connect.timeout", "240000")
+ config.set("dfs.namenode.stale.datanode.interva", "240000")
+ config.set("hbase.rpc.shortoperation.timeout", "240000")
+ config.set("hbase.regionserver.lease.period", "240000")
+
+ if (useMiniCluster) {
+ cluster = testUtil.startMiniCluster(NMasters, NRegionServers)
+ println(s"# of region servers = ${cluster.countServedRegions}")
+ }
+
+ @transient val conf = new SparkConf
+ val SparkPort = 11223
+ conf.set("spark.ui.port", SparkPort.toString)
+ // @transient val sc = new SparkContext(s"local[$NWorkers]", "HBaseTestsSparkContext", conf)
+ hbContext = new HBaseSQLContext(TestSQLContext.sparkContext, config)
+
+ catalog = hbContext.catalog
+ hbaseAdmin = new HBaseAdmin(config)
+
+ }
+
+ def tableSetup() = {
+ createTable()
+ }
+
+ def createTable() = {
+
+ val createTable = useMiniCluster
+ if (createTable) {
+ try {
+ hbContext.sql( s"""CREATE TABLE $TabName(col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER,
+ col5 LONG, col6 FLOAT, col7 DOUBLE)
+ MAPPED BY ($HbaseTabName, KEYS=[col7, col1, col3], COLS=[col2=cf1.cq11,
+ col4=cf1.cq12, col5=cf2.cq21, col6=cf2.cq22])"""
+ .stripMargin)
+ } catch {
+ case e: TableExistsException =>
+ e.printStackTrace
+ }
+
+ try {
+ val hdesc = new HTableDescriptor(TableName.valueOf(HbaseTabName))
+ Array(new HColumnDescriptor("cf1"), new HColumnDescriptor("cf2")).foreach { f =>
+ hdesc.addFamily(f)
+ }
+ hbaseAdmin.createTable(hdesc)
+ } catch {
+ case e: TableExistsException =>
+ e.printStackTrace
+ }
+ }
+
+ if (!hbaseAdmin.tableExists(HbaseTabName)) {
+ throw new IllegalArgumentException("where is our table?")
+ }
+
+ }
+
+ def testGetTable = {
+ println("get table")
+ // prepare the test data
+ HBaseCatalog.getKeysFromAllMetaTableRows(config)
+ .foreach { r => logger.info(s"Metatable Rowkey: ${new String(r)}")}
+
+ val oresult = catalog.getTable(TabName)
+ assert(oresult.isDefined)
+ val result = oresult.get
+ assert(result.tablename == TabName)
+ assert(result.hbaseTableName.tableName.getNameAsString == DbName + ":" + HbaseTabName)
+ assert(result.colFamilies.size == 2)
+ assert(result.columns.columns.size == 4)
+ assert(result.rowKeyColumns.columns.size == 3)
+ val relation = catalog.lookupRelation(Some(DbName), TabName)
+ val hbRelation = relation.asInstanceOf[HBaseRelation]
+ assert(hbRelation.colFamilies == Seq("cf1", "cf2"))
+ assert(Seq("col7", "col1", "col3").zip(hbRelation.partitionKeys)
+ .forall { x => x._1 == x._2.name})
+ val rkColumns = new Columns(Seq(Column("col7", null, "col7", DoubleType),
+ Column("col1", null, "col1", StringType),
+ Column("col3", null, "col3", ShortType)))
+ assert(hbRelation.catalogTable.rowKeyColumns.equals(rkColumns))
+ assert(relation.childrenResolved)
+ }
+
+ def checkHBaseTableExists(hbaseTable: String) = {
+ hbaseAdmin.listTableNames.foreach { t => println(s"table: $t")}
+ val tname = TableName.valueOf(hbaseTable)
+ hbaseAdmin.tableExists(tname)
+ }
+
+ def insertTestData() = {
+ if (!checkHBaseTableExists(HbaseTabName)) {
+ throw new IllegalStateException(s"Unable to find table ${HbaseTabName}")
+ }
+ val htable = new HTable(config, HbaseTabName)
+
+ var put = new Put(makeRowKey(12345.0, "Michigan", 12345))
+ addRowVals(put, (123).toByte, 12345678, 12345678901234L, 1234.5678F)
+ htable.put(put)
+ put = new Put(makeRowKey(456789.0, "Michigan", 4567))
+ addRowVals(put, (456).toByte, 456789012, 4567890123446789L, 456.78901F)
+ htable.put(put)
+ htable.close
+
+ }
+
+ val runMultiTests: Boolean = false
+
+ def testQuery() {
+ ctxSetup()
+ createTable()
+ // testInsertIntoTable
+ // testHBaseScanner
+
+ if (!checkHBaseTableExists(HbaseTabName)) {
+ throw new IllegalStateException(s"Unable to find table ${HbaseTabName}")
+ }
+
+ insertTestData
+
+ var results: SchemaRDD = null
+ var data: Array[sql.Row] = null
+
+ results = hbContext.sql( s"""SELECT * FROM $TabName """.stripMargin)
+ printResults("Star* operator", results)
+ data = results.collect
+ assert(data.size >= 2)
+
+ results = hbContext.sql(
+ s"""SELECT col3, col1, col7 FROM $TabName LIMIT 1
+ """.stripMargin)
+ printResults("Limit Op", results)
+ data = results.collect
+ assert(data.size == 1)
+
+ results = hbContext.sql(
+ s"""SELECT col3, col2, col1, col4, col7 FROM $TabName order by col7 desc
+ """.stripMargin)
+ printResults("Ordering with nonkey columns", results)
+ data = results.collect
+ assert(data.size >= 2)
+
+ try {
+ results = hbContext.sql(
+ s"""SELECT col3, col1, col7 FROM $TabName LIMIT 1
+ """.stripMargin)
+ printResults("Limit Op", results)
+ } catch {
+ case e: Exception => "Query with Limit failed"
+ e.printStackTrace
+ }
+
+ results = hbContext.sql( s"""SELECT col3, col1, col7 FROM $TabName ORDER by col7 DESC
+ """.stripMargin)
+ printResults("Order by", results)
+
+ if (runMultiTests) {
+ results = hbContext.sql( s"""SELECT col3, col2, col1, col7, col4 FROM $TabName
+ WHERE col1 ='Michigan'
+ """.stripMargin)
+ printResults("Where/filter on rowkey", results)
+ data = results.collect
+ assert(data.size >= 1)
+
+ results = hbContext.sql( s"""SELECT col7, col3, col2, col1, col4 FROM $TabName
+ WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 3500 and col3 <= 5000
+ """.stripMargin)
+ printResults("Where/filter on rowkeys change", results)
+
+ results = hbContext.sql( s"""SELECT col3, col2, col1, col7, col4 FROM $TabName
+ WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 3500 and col3 <= 5000
+ """.stripMargin)
+ printResults("Where/filter on rowkeys", results)
+
+
+ results = hbContext.sql( s"""SELECT col1, col3, col7 FROM $TabName
+ WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 35 and col3 <= 50 and col3 != 7.0
+ """.stripMargin)
+ printResults("Where with notequal", results)
+
+ results = hbContext.sql( s"""SELECT col1, col2, col3, col7 FROM $TabName
+ WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 35 and col3 <= 50 and cast(col2 as double) != 7.0
+ """.stripMargin)
+ printResults("Include non-rowkey cols in project", results)
+ }
+ if (runMultiTests) {
+ results = hbContext.sql( s"""SELECT col1, col2, col3, col7 FROM $TabName
+ WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 35 and col3 <= 50 and col2 != 7.0
+ """.stripMargin)
+ printResults("Include non-rowkey cols in filter", results)
+
+ results = hbContext.sql( s"""SELECT sum(col3) as col3sum, col1, col3 FROM $TabName
+ WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 35 and col3 <= 50 and col2 != 7.0
+ group by col1, col3
+ """.stripMargin)
+ printResults("Aggregates on rowkeys", results)
+
+
+ results = hbContext.sql( s"""SELECT sum(col2) as col2sum, col4, col1, col3, col2 FROM $TabName
+ WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 35 and col3 <= 50
+ group by col1, col2, col4, col3
+ """.stripMargin)
+ printResults("Aggregates on non-rowkeys", results)
+ }
+ }
+
+ def printResults(msg: String, results: SchemaRDD) = {
+ if (results.isInstanceOf[TestingSchemaRDD]) {
+ val data = results.asInstanceOf[TestingSchemaRDD].collectPartitions
+ println(s"For test [$msg]: Received data length=${data(0).length}: ${
+ data(0).mkString("RDD results: {", "],[", "}")
+ }")
+ } else {
+ val data = results.collect
+ println(s"For test [$msg]: Received data length=${data.length}: ${
+ data.mkString("RDD results: {", "],[", "}")
+ }")
+ }
+
+ }
+
+ def createTableTest2() {
+ ctxSetup()
+ // Following fails with Unresolved:
+ // Col1 Sort is unresolved
+ // Col4 and col2 Aggregation are unresolved (interesting col3 IS resolved)
+ // val results = hbContext.sql(s"""SELECT col4, col1, col3, col2 FROM $TabName
+ // WHERE col1 ='Michigan' and col7 >= 2500.0 and col3 >= 35 and col3 <= 50 group by col7, col1
+ // ORDER BY col1 DESC"""
+ // .stripMargin)
+
+ hbContext.sql( s"""CREATE TABLE $DbName.$TabName(col1 STRING, col2 BYTE, col3 SHORT, col4 INTEGER,
+ col5 LONG, col6 FLOAT, col7 DOUBLE)
+ MAPPED BY ($HbaseTabName, KEYS=[col7, col1, col3], COLS=[col2=cf1.cq11,
+ col4=cf1.cq12, col5=cf2.cq21, col6=cf2.cq22])"""
+ .stripMargin)
+
+ val catTab = catalog.getTable(TabName)
+ assert(catTab.get.tablename == TabName)
+
+ testGetTable
+ }
+
+ def testInsertIntoTable() = {
+ logger.info("Insert data into the test table using applySchema")
+ ctxSetup()
+ tableSetup()
+ // import hbContext.createSchemaRDD
+ val myRows = hbContext.sparkContext.parallelize(Range(1, 21).map { ix =>
+ MyTable(s"Michigan", ix.toByte, (ix.toByte * 256).asInstanceOf[Short], ix.toByte * 65536, ix.toByte * 65563L * 65536L,
+ (ix.toByte * 65536.0).asInstanceOf[Float], ix.toByte * 65536.0D * 65563.0D)
+ })
+
+ // import org.apache.spark.sql.execution.ExistingRdd
+ // val myRowsSchema = ExistingRdd.productToRowRdd(myRows)
+ // hbContext.applySchema(myRowsSchema, schema)
+ val TempTabName = "MyTempTab"
+ myRows.registerTempTable(TempTabName)
+
+ val localData = myRows.collect
+
+ hbContext.sql(
+ s"""insert into $TabName select * from $TempTabName""".stripMargin)
+
+ val hbRelation = catalog.lookupRelation(Some(DbName), TabName).asInstanceOf[HBaseRelation]
+
+ val hbasePlanner = new SparkPlanner with HBaseStrategies {
+ @transient override val hbaseContext: HBaseSQLContext = hbContext
+ }
+
+ val myRowsSchemaRdd = hbContext.createSchemaRDD(myRows)
+ val insertPlan = hbasePlanner.InsertIntoHBaseTableFromRdd(hbRelation,
+ myRowsSchemaRdd)(hbContext)
+
+ var rowKeysWithRows = myRowsSchemaRdd.zip(
+ HBaseRelation.rowKeysFromRows(myRowsSchemaRdd, hbRelation))
+ // var keysCollect = rowKeysWithRows.collect
+ HBaseStrategies.putToHBase(myRows, hbRelation, hbContext)
+
+ val preparedInsertRdd = insertPlan.execute
+ val executedInsertRdd = preparedInsertRdd.collect
+
+ val rowsRdd = myRowsSchemaRdd
+ val rowKeysWithRows2 = rowsRdd.zip(
+ HBaseRelation.rowKeysFromRows(rowsRdd, hbRelation))
+ HBaseStrategies.putToHBase(rowsRdd, hbRelation, hbContext)
+
+
+ cluster.shutdown
+ }
+
+ import org.apache.spark.sql.hbase.HBaseRelation.RowKeyParser
+
+ def makeRowKey(col7: Double, col1: String, col3: Short) = {
+ val size = 1 + sizeOf(col7) + sizeOf(col1) + sizeOf(col3) + 3 * 2
+ + RowKeyParser.DimensionCountLen
+ // val barr = new Array[Byte](size)
+ val bos = new ByteArrayOutputStream(size)
+ val dos = new DataOutputStream(bos)
+ dos.writeByte(HBaseRelation.RowKeyParser.Version1)
+ dos.writeDouble(col7)
+ dos.writeBytes(col1)
+ dos.writeShort(col3)
+ var off = 1
+ dos.writeShort(off)
+ off += sizeOf(col7)
+ dos.writeShort(off)
+ off += sizeOf(col1)
+ dos.writeShort(off)
+ dos.writeByte(3.toByte)
+ val s = bos.toString
+ // println((s"MakeRowKey: [${RowKeyParser.show(bos.toByteArray)}]")
+ println(s"MakeRowKey: [${s}]")
+ bos.toByteArray
+ }
+
+ def addRowVals(put: Put, col2: Byte, col4: Int, col5: Long, col6: Float) = {
+ // val barr = new Array[Byte](size)
+ var bos = new ByteArrayOutputStream()
+ var dos = new DataOutputStream(bos)
+ dos.writeByte(col2)
+ put.add(s2b("cf1"), s2b("cq11"), bos.toByteArray)
+ bos = new ByteArrayOutputStream()
+ dos = new DataOutputStream(bos)
+ dos.writeInt(col4)
+ put.add(s2b("cf1"), s2b("cq12"), bos.toByteArray)
+ bos = new ByteArrayOutputStream()
+ dos = new DataOutputStream(bos)
+ dos.writeLong(col5)
+ put.add(s2b("cf2"), s2b("cq21"), bos.toByteArray)
+ bos = new ByteArrayOutputStream()
+ dos = new DataOutputStream(bos)
+ dos.writeFloat(col6)
+ put.add(s2b("cf2"), s2b("cq22"), bos.toByteArray)
+ }
+
+ def testHBaseScanner() = {
+ val scan = new Scan
+ val htable = new HTable(config, HbaseTabName)
+ val scanner = htable.getScanner(scan)
+ var res: Result = null
+ do {
+ res = scanner.next
+ if (res != null) println(s"Row ${res.getRow} has map=${res.getNoVersionMap.toString}")
+ } while (res != null)
+ }
+
+ def main(args: Array[String]) = {
+ // testInsertIntoTable
+ testQuery
+ }
+
+}
diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseTestingSparkContext.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseTestingSparkContext.scala
new file mode 100644
index 0000000000000..d19cd8ef6a14e
--- /dev/null
+++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/HBaseTestingSparkContext.scala
@@ -0,0 +1,31 @@
+package org.apache.spark.sql.hbase
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SharedSparkContext}
+import org.apache.log4j.Logger
+import org.scalatest.{BeforeAndAfterAll, Suite}
+
+/**
+ * HBaseSharedSparkContext. Modeled after SharedSparkContext
+ *
+ * Created by sboesch on 9/28/14.
+ */
+class HBaseTestingSparkContext(nSlaves: Int) /* extends BeforeAndAfterAll */ {
+ self: Suite =>
+ @transient val logger = Logger.getLogger(getClass.getName)
+ @transient private var _sc: SparkContext = _
+
+ def sc: SparkContext = _sc
+
+ var conf = new SparkConf(false)
+
+// val NSlaves = 2
+ val slaves = s"local[$nSlaves]"
+ def beforeAll() {
+ _sc = new SparkContext(slaves, "test", conf)
+ }
+
+ def afterAll() {
+ LocalSparkContext.stop(_sc)
+ _sc = null
+ }
+}
diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/RowKeyParserSuite.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/RowKeyParserSuite.scala
new file mode 100644
index 0000000000000..1ac62473ce8bd
--- /dev/null
+++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/RowKeyParserSuite.scala
@@ -0,0 +1,98 @@
+package org.apache.spark.sql.hbase
+
+import java.io.{ByteArrayOutputStream, DataOutputStream}
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.hbase.DataTypeUtils._
+import org.apache.spark.sql.hbase.HBaseCatalog.{Column, Columns}
+import org.scalatest.{FunSuite, ShouldMatchers}
+
+/**
+ * CompositeRowKeyParserTest
+ * Created by sboesch on 9/25/14.
+ */
+
+case class TestCall(callId: Int, userId: String, duration: Double)
+
+class RowKeyParserSuite extends FunSuite with ShouldMatchers {
+ @transient val logger = Logger.getLogger(getClass.getName)
+
+ import org.apache.spark.sql.hbase.HBaseRelation.RowKeyParser
+
+ def makeRowKey(col7: Double, col1: String, col3: Short) = {
+ val size = 1 + sizeOf(col7) + sizeOf(col1) + sizeOf(col3) + 3 * 2 +
+ RowKeyParser.DimensionCountLen
+ // val barr = new Array[Byte](size)
+ val bos = new ByteArrayOutputStream(size)
+ val dos = new DataOutputStream(bos)
+ dos.writeByte(RowKeyParser.Version1)
+ dos.writeDouble(col7)
+ dos.writeBytes(col1)
+ dos.writeShort(col3)
+ var off = 1
+ dos.writeShort(off)
+ off += sizeOf(col7)
+ dos.writeShort(off)
+ off += sizeOf(col1)
+ dos.writeShort(off)
+ dos.writeByte(3.toByte)
+ val s = bos.toString
+ // println((s"MakeRowKey: [${RowKeyParser.show(bos.toByteArray)}]")
+ println(s"MakeRowKey: [${s}]")
+ bos.toByteArray
+ }
+
+ test("rowkey test") {
+
+ val cols = Range(0, 3).zip(Seq(DoubleType, StringType, ShortType))
+ .map { case (ix, dataType) =>
+ Column(s"col{ix+10}", s"cf${ix + 1}", s"cq${ix + 10}", dataType)
+ }.toSeq
+
+ val pat = makeRowKey(12345.6789, "Column1-val", 12345)
+ val parsedKeyMap = RowKeyParser.parseRowKeyWithMetaData(cols, pat)
+ println(s"parsedKeyWithMetaData: ${parsedKeyMap.toString}")
+ // assert(parsedKeyMap === Map("col7" ->(12345.6789, "col1" -> "Column1-val", "col3" -> 12345)))
+ // assert(parsedKeyMap.values.toList.sorted === List(12345.6789, "Column1-val",12345))
+
+ val parsedKey = RowKeyParser.parseRowKey(pat)
+ println(s"parsedRowKey: ${parsedKey.toString}")
+
+ }
+
+ test("CreateKeyFromCatalystRow") {
+ import org.apache.spark.sql.catalyst.types._
+ val schema: StructType = new StructType(Seq(
+ new StructField("callId", IntegerType, false),
+ new StructField("userId", StringType, false),
+ new StructField("cellTowers", StringType, true),
+ new StructField("callType", ByteType, false),
+ new StructField("deviceId", LongType, false),
+ new StructField("duration", DoubleType, false))
+ )
+
+ val keyCols = new Columns(Seq(
+ Column("userId", "cf1", "useridq", StringType),
+ Column("callId", "cf1", "callidq", IntegerType),
+ Column("deviceId", "cf2", "deviceidq", LongType)
+ ))
+ // val cols = new Columns(Seq(
+ // Column("cellTowers","cf2","cellTowersq",StringType),
+ // Column("callType","cf1","callTypeq",ByteType),
+ // Column("duration","cf2","durationq",DoubleType)
+ // ))
+ val row = Row(12345678, "myUserId1", "tower1,tower9,tower3", 22.toByte, 111223445L, 12345678.90123)
+ val key = RowKeyParser.createKeyFromCatalystRow(schema, keyCols, row)
+ assert(key.length == 29)
+ val parsedKey = RowKeyParser.parseRowKey(key)
+ assert(parsedKey.length == 3)
+ import org.apache.spark.sql.hbase.DataTypeUtils.cast
+ assert(cast(parsedKey(0), StringType) == "myUserId1")
+ assert(cast(parsedKey(1), IntegerType) == 12345678)
+ assert(cast(parsedKey(2), LongType) == 111223445L)
+
+ }
+
+}
diff --git a/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/TestData.scala b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/TestData.scala
new file mode 100644
index 0000000000000..89f537310aebf
--- /dev/null
+++ b/sql/hbase/src/test/scala/org/apache/spark/sql/hbase/TestData.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hbase
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.test._
+
+/* Implicits */
+import org.apache.spark.sql.test.TestSQLContext._
+
+case class TestData(key: Int, value: String)
+
+object TestData {
+// val testData: SchemaRDD = TestSQLContext.sparkContext.parallelize(
+// (1 to 100).map(i => TestData(i, i.toString)))
+// testData.registerTempTable("testData")
+//
+// case class LargeAndSmallInts(a: Int, b: Int)
+// val largeAndSmallInts: SchemaRDD =
+// TestSQLContext.sparkContext.parallelize(
+// LargeAndSmallInts(2147483644, 1) ::
+// LargeAndSmallInts(1, 2) ::
+// LargeAndSmallInts(2147483645, 1) ::
+// LargeAndSmallInts(2, 2) ::
+// LargeAndSmallInts(2147483646, 1) ::
+// LargeAndSmallInts(3, 2) :: Nil)
+// largeAndSmallInts.registerTempTable("largeAndSmallInts")
+//
+// case class TestData2(a: Int, b: Int)
+// val testData2: SchemaRDD =
+// TestSQLContext.sparkContext.parallelize(
+// TestData2(1, 1) ::
+// TestData2(1, 2) ::
+// TestData2(2, 1) ::
+// TestData2(2, 2) ::
+// TestData2(3, 1) ::
+// TestData2(3, 2) :: Nil)
+// testData2.registerTempTable("testData2")
+
+ // TODO: There is no way to express null primitives as case classes currently...
+ val testData3 =
+ logical.LocalRelation('a.int, 'b.int).loadData(
+ (1, null) ::
+ (2, 2) :: Nil)
+
+ val emptyTableData = logical.LocalRelation('a.int, 'b.int)
+
+ case class UpperCaseData(N: Int, L: String)
+ val upperCaseData =
+ TestSQLContext.sparkContext.parallelize(
+ UpperCaseData(1, "A") ::
+ UpperCaseData(2, "B") ::
+ UpperCaseData(3, "C") ::
+ UpperCaseData(4, "D") ::
+ UpperCaseData(5, "E") ::
+ UpperCaseData(6, "F") :: Nil)
+ upperCaseData.registerTempTable("upperCaseData")
+
+ case class LowerCaseData(n: Int, l: String)
+ val lowerCaseData =
+ TestSQLContext.sparkContext.parallelize(
+ LowerCaseData(1, "a") ::
+ LowerCaseData(2, "b") ::
+ LowerCaseData(3, "c") ::
+ LowerCaseData(4, "d") :: Nil)
+ lowerCaseData.registerTempTable("lowerCaseData")
+
+ case class ArrayData(data: Seq[Int], nestedData: Seq[Seq[Int]])
+ val arrayData =
+ TestSQLContext.sparkContext.parallelize(
+ ArrayData(Seq(1,2,3), Seq(Seq(1,2,3))) ::
+ ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil)
+ arrayData.registerTempTable("arrayData")
+
+ case class MapData(data: Map[Int, String])
+ val mapData =
+ TestSQLContext.sparkContext.parallelize(
+ MapData(Map(1 -> "a1", 2 -> "b1", 3 -> "c1", 4 -> "d1", 5 -> "e1")) ::
+ MapData(Map(1 -> "a2", 2 -> "b2", 3 -> "c2", 4 -> "d2")) ::
+ MapData(Map(1 -> "a3", 2 -> "b3", 3 -> "c3")) ::
+ MapData(Map(1 -> "a4", 2 -> "b4")) ::
+ MapData(Map(1 -> "a5")) :: Nil)
+ mapData.registerTempTable("mapData")
+
+ case class StringData(s: String)
+ val repeatedData =
+ TestSQLContext.sparkContext.parallelize(List.fill(2)(StringData("test")))
+ repeatedData.registerTempTable("repeatedData")
+
+ val nullableRepeatedData =
+ TestSQLContext.sparkContext.parallelize(
+ List.fill(2)(StringData(null)) ++
+ List.fill(2)(StringData("test")))
+ nullableRepeatedData.registerTempTable("nullableRepeatedData")
+
+ case class NullInts(a: Integer)
+ val nullInts =
+ TestSQLContext.sparkContext.parallelize(
+ NullInts(1) ::
+ NullInts(2) ::
+ NullInts(3) ::
+ NullInts(null) :: Nil
+ )
+ nullInts.registerTempTable("nullInts")
+
+ val allNulls =
+ TestSQLContext.sparkContext.parallelize(
+ NullInts(null) ::
+ NullInts(null) ::
+ NullInts(null) ::
+ NullInts(null) :: Nil)
+ allNulls.registerTempTable("allNulls")
+
+ case class NullStrings(n: Int, s: String)
+ val nullStrings =
+ TestSQLContext.sparkContext.parallelize(
+ NullStrings(1, "abc") ::
+ NullStrings(2, "ABC") ::
+ NullStrings(3, null) :: Nil)
+ nullStrings.registerTempTable("nullStrings")
+
+ case class TableName(tableName: String)
+ TestSQLContext.sparkContext.parallelize(TableName("test") :: Nil).registerTempTable("tableName")
+
+ val unparsedStrings =
+ TestSQLContext.sparkContext.parallelize(
+ "1, A1, true, null" ::
+ "2, B2, false, null" ::
+ "3, C3, true, null" ::
+ "4, D4, true, 2147483644" :: Nil)
+
+ case class TimestampField(time: Timestamp)
+ val timestamps = TestSQLContext.sparkContext.parallelize((1 to 3).map { i =>
+ TimestampField(new Timestamp(i))
+ })
+ timestamps.registerTempTable("timestamps")
+
+ case class IntField(i: Int)
+ // An RDD with 4 elements and 8 partitions
+ val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8)
+ withEmptyParts.registerTempTable("withEmptyParts")
+}
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 8a7035c85e9f1..137a11f24f2c9 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -99,6 +99,7 @@
org.apache.maven.plugins
maven-install-plugin
+
true