Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hbase;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.Arrays;
import java.util.List;
import scala.Serializable;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* copy from hbase for Serializable issue
*/
public class SparkImmutableBytesWritable
implements WritableComparable<SparkImmutableBytesWritable>, Serializable {
private byte[] bytes;
private int offset;
private int length;

/**
* Create a zero-size sequence.
*/
public SparkImmutableBytesWritable() {
super();
}

/**
* Create a ImmutableBytesWritable using the byte array as the initial value.
* @param bytes This array becomes the backing storage for the object.
*/
public SparkImmutableBytesWritable(byte[] bytes) {
this(bytes, 0, bytes.length);
}

/**
* Set the new ImmutableBytesWritable to the contents of the passed
* <code>ibw</code>.
* @param ibw the value to set this ImmutableBytesWritable to.
*/
public SparkImmutableBytesWritable(final SparkImmutableBytesWritable ibw) {
this(ibw.get(), ibw.getOffset(), ibw.getLength());
}

/**
* Set the value to a given byte range
* @param bytes the new byte range to set to
* @param offset the offset in newData to start at
* @param length the number of bytes in the range
*/
public SparkImmutableBytesWritable(final byte[] bytes, final int offset,
final int length) {
this.bytes = bytes;
this.offset = offset;
this.length = length;
}

/**
* Get the data from the BytesWritable.
* @return The data is only valid between offset and offset+length.
*/
public byte [] get() {
if (this.bytes == null) {
throw new IllegalStateException("Uninitialiized. Null constructor " +
"called w/o accompaying readFields invocation");
}
return this.bytes;
}

/**
* @param b Use passed bytes as backing array for this instance.
*/
public void set(final byte [] b) {
set(b, 0, b.length);
}

/**
* @param b Use passed bytes as backing array for this instance.
* @param offset
* @param length
*/
public void set(final byte [] b, final int offset, final int length) {
this.bytes = b;
this.offset = offset;
this.length = length;
}

/**
* @return the number of valid bytes in the buffer
* @deprecated use {@link #getLength()} instead
*/
@Deprecated
public int getSize() {
if (this.bytes == null) {
throw new IllegalStateException("Uninitialiized. Null constructor " +
"called w/o accompaying readFields invocation");
}
return this.length;
}

/**
* @return the number of valid bytes in the buffer
*/
public int getLength() {
if (this.bytes == null) {
throw new IllegalStateException("Uninitialiized. Null constructor " +
"called w/o accompaying readFields invocation");
}
return this.length;
}

/**
* @return offset
*/
public int getOffset(){
return this.offset;
}

public void readFields(final DataInput in) throws IOException {
this.length = in.readInt();
this.bytes = new byte[this.length];
in.readFully(this.bytes, 0, this.length);
this.offset = 0;
}

public void write(final DataOutput out) throws IOException {
out.writeInt(this.length);
out.write(this.bytes, this.offset, this.length);
}

// Below methods copied from BytesWritable
@Override
public int hashCode() {
int hash = 1;
for (int i = offset; i < offset + length; i++)
hash = (31 * hash) + (int)bytes[i];
return hash;
}

/**
* Define the sort order of the BytesWritable.
* @param that The other bytes writable
* @return Positive if left is bigger than right, 0 if they are equal, and
* negative if left is smaller than right.
*/
public int compareTo(SparkImmutableBytesWritable that) {
return WritableComparator.compareBytes(
this.bytes, this.offset, this.length,
that.bytes, that.offset, that.length);
}

/**
* Compares the bytes in this object to the specified byte array
* @param that
* @return Positive if left is bigger than right, 0 if they are equal, and
* negative if left is smaller than right.
*/
public int compareTo(final byte [] that) {
return WritableComparator.compareBytes(
this.bytes, this.offset, this.length,
that, 0, that.length);
}

/**
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object right_obj) {
if (right_obj instanceof byte []) {
return compareTo((byte [])right_obj) == 0;
}
if (right_obj instanceof SparkImmutableBytesWritable) {
return compareTo((SparkImmutableBytesWritable)right_obj) == 0;
}
return false;
}

/**
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder(3*this.length);
final int endIdx = this.offset + this.length;
for (int idx = this.offset; idx < endIdx ; idx++) {
sb.append(' ');
String num = Integer.toHexString(0xff & this.bytes[idx]);
// if it is only one digit, add a leading 0.
if (num.length() < 2) {
sb.append('0');
}
sb.append(num);
}
return sb.length() > 0 ? sb.substring(1) : "";
}

/** A Comparator optimized for ImmutableBytesWritable.
*/
public static class Comparator extends WritableComparator {
private BytesWritable.Comparator comparator =
new BytesWritable.Comparator();

/** constructor */
public Comparator() {
super(SparkImmutableBytesWritable.class);
}

/**
* @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int)
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return comparator.compare(b1, s1, l1, b2, s2, l2);
}
}

static { // register this comparator
WritableComparator.define(SparkImmutableBytesWritable.class, new Comparator());
}

/**
* @param array List of byte [].
* @return Array of byte [].
*/
public static byte [][] toArray(final List<byte []> array) {
// List#toArray doesn't work on lists of byte [].
byte[][] results = new byte[array.size()][];
for (int i = 0; i < array.size(); i++) {
results[i] = array.get(i);
}
return results;
}

/**
* Returns a copy of the bytes referred to by this writable
*/
public byte[] copyBytes() {
return Arrays.copyOfRange(bytes, offset, offset+length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hbase

import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
import scala.Array
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkEnv
import org.apache.spark.Partitioner
import org.apache.spark.util.{Utils, CollectionsUtils}
import org.apache.spark.serializer.JavaSerializer
import org.apache.hadoop.hbase.client.HTable

class HBasePartitioner [K : Ordering : ClassTag, V](
@transient rdd: RDD[_ <: Product2[K,V]])(splitKeys: Array[K])
extends Partitioner {

private var ordering = implicitly[Ordering[K]]

private var rangeBounds: Array[K] = splitKeys

def numPartitions = rangeBounds.length + 1

private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

// todo: test this
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
partition
}

override def equals(other: Any): Boolean = other match {
case r: HBasePartitioner[_,_] =>
r.rangeBounds.sameElements(rangeBounds)
case _ =>
false
}

override def hashCode(): Int = {
val prime = 31
var result = 1
var i = 0
while (i < rangeBounds.length) {
result = prime * result + rangeBounds(i).hashCode
i += 1
}
result = prime * result
result
}

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream) {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
case _ =>
out.writeObject(ordering)
out.writeObject(binarySearch)

val ser = sfactory.newInstance()
Utils.serializeViaNestedStream(out, ser) { stream =>
stream.writeObject(scala.reflect.classTag[Array[K]])
stream.writeObject(rangeBounds)
}
}
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream) {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
case _ =>
ordering = in.readObject().asInstanceOf[Ordering[K]]
binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]

val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
rangeBounds = ds.readObject[Array[K]]()
}
}
}
}

//Todo: test this
object HBasePartitioner {
implicit def orderingRowKey[SparkImmutableBytesWritable]: Ordering[SparkImmutableBytesWritable] =
OrderingRowKey.asInstanceOf[Ordering[SparkImmutableBytesWritable]]
}

object OrderingRowKey extends Ordering[SparkImmutableBytesWritable] {
def compare(a: SparkImmutableBytesWritable, b: SparkImmutableBytesWritable) = a.compareTo(b)
}
Loading