Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.columnar;

import org.apache.spark.sql.execution.vectorized.Dictionary;

public final class ColumnDictionary implements Dictionary {
private int[] intDictionary;
private long[] longDictionary;

public ColumnDictionary(int[] dictionary) {
this.intDictionary = dictionary;
}

public ColumnDictionary(long[] dictionary) {
this.longDictionary = dictionary;
}

@Override
public int decodeToInt(int id) {
return intDictionary[id];
}

@Override
public long decodeToLong(int id) {
return longDictionary[id];
}

@Override
public float decodeToFloat(int id) {
throw new UnsupportedOperationException("Dictionary encoding does not support float");
}

@Override
public double decodeToDouble(int id) {
throw new UnsupportedOperationException("Dictionary encoding does not support double");
}

@Override
public byte[] decodeToBinary(int id) {
throw new UnsupportedOperationException("Dictionary encoding does not support String");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ public void putShorts(int rowId, int count, short[] src, int srcIndex) {
null, data + 2 * rowId, count * 2);
}

@Override
public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
null, data + rowId * 2, count * 2);
}

@Override
public short getShort(int rowId) {
if (dictionary == null) {
Expand Down Expand Up @@ -268,6 +274,12 @@ public void putInts(int rowId, int count, int[] src, int srcIndex) {
null, data + 4 * rowId, count * 4);
}

@Override
public void putInts(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
null, data + rowId * 4, count * 4);
}

@Override
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
if (!bigEndianPlatform) {
Expand Down Expand Up @@ -334,6 +346,12 @@ public void putLongs(int rowId, int count, long[] src, int srcIndex) {
null, data + 8 * rowId, count * 8);
}

@Override
public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
null, data + rowId * 8, count * 8);
}

@Override
public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
if (!bigEndianPlatform) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ public void putShorts(int rowId, int count, short[] src, int srcIndex) {
System.arraycopy(src, srcIndex, shortData, rowId, count);
}

@Override
public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, shortData,
Platform.SHORT_ARRAY_OFFSET + rowId * 2, count * 2);
}

@Override
public short getShort(int rowId) {
if (dictionary == null) {
Expand Down Expand Up @@ -272,6 +278,12 @@ public void putInts(int rowId, int count, int[] src, int srcIndex) {
System.arraycopy(src, srcIndex, intData, rowId, count);
}

@Override
public void putInts(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, intData,
Platform.INT_ARRAY_OFFSET + rowId * 4, count * 4);
}

@Override
public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
Expand Down Expand Up @@ -332,6 +344,12 @@ public void putLongs(int rowId, int count, long[] src, int srcIndex) {
System.arraycopy(src, srcIndex, longData, rowId, count);
}

@Override
public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, longData,
Platform.LONG_ARRAY_OFFSET + rowId * 8, count * 8);
}

@Override
public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,138 +113,156 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
protected abstract void reserveInternal(int capacity);

/**
* Sets the value at rowId to null/not null.
* Sets null/not null to the value at rowId.
*/
public abstract void putNotNull(int rowId);
public abstract void putNull(int rowId);

/**
* Sets the values from [rowId, rowId + count) to null/not null.
* Sets null/not null to the values at [rowId, rowId + count).
*/
public abstract void putNulls(int rowId, int count);
public abstract void putNotNulls(int rowId, int count);

/**
* Sets the value at rowId to `value`.
* Sets `value` to the value at rowId.
*/
public abstract void putBoolean(int rowId, boolean value);

/**
* Sets values from [rowId, rowId + count) to value.
* Sets value to [rowId, rowId + count).
*/
public abstract void putBooleans(int rowId, int count, boolean value);

/**
* Sets the value at rowId to `value`.
* Sets `value` to the value at rowId.
*/
public abstract void putByte(int rowId, byte value);

/**
* Sets values from [rowId, rowId + count) to value.
* Sets value to [rowId, rowId + count).
*/
public abstract void putBytes(int rowId, int count, byte value);

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId + count)
*/
public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets the value at rowId to `value`.
* Sets `value` to the value at rowId.
*/
public abstract void putShort(int rowId, short value);

/**
* Sets values from [rowId, rowId + count) to value.
* Sets value to [rowId, rowId + count).
*/
public abstract void putShorts(int rowId, int count, short value);

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId + count)
*/
public abstract void putShorts(int rowId, int count, short[] src, int srcIndex);

/**
* Sets the value at rowId to `value`.
* Sets values from [src[srcIndex], src[srcIndex + count * 2]) to [rowId, rowId + count)
* The data in src must be 2-byte platform native endian shorts.
*/
public abstract void putShorts(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets `value` to the value at rowId.
*/
public abstract void putInt(int rowId, int value);

/**
* Sets values from [rowId, rowId + count) to value.
* Sets value to [rowId, rowId + count).
*/
public abstract void putInts(int rowId, int count, int value);

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId + count)
*/
public abstract void putInts(int rowId, int count, int[] src, int srcIndex);

/**
* Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
* Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, rowId + count)
* The data in src must be 4-byte platform native endian ints.
*/
public abstract void putInts(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, rowId + count)
* The data in src must be 4-byte little endian ints.
*/
public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets the value at rowId to `value`.
* Sets `value` to the value at rowId.
*/
public abstract void putLong(int rowId, long value);

/**
* Sets values from [rowId, rowId + count) to value.
* Sets value to [rowId, rowId + count).
*/
public abstract void putLongs(int rowId, int count, long value);

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId + count)
*/
public abstract void putLongs(int rowId, int count, long[] src, int srcIndex);

/**
* Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
* Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, rowId + count)
* The data in src must be 8-byte platform native endian longs.
*/
public abstract void putLongs(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets values from [src + srcIndex, src + srcIndex + count * 8) to [rowId, rowId + count)
* The data in src must be 8-byte little endian longs.
*/
public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets the value at rowId to `value`.
* Sets `value` to the value at rowId.
*/
public abstract void putFloat(int rowId, float value);

/**
* Sets values from [rowId, rowId + count) to value.
* Sets value to [rowId, rowId + count).
*/
public abstract void putFloats(int rowId, int count, float value);

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId + count)
*/
public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);

/**
* Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
* The data in src must be ieee formatted floats.
* Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, rowId + count)
* The data in src must be ieee formatted floats in platform native endian.
*/
public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex);

/**
* Sets the value at rowId to `value`.
* Sets `value` to the value at rowId.
*/
public abstract void putDouble(int rowId, double value);

/**
* Sets values from [rowId, rowId + count) to value.
* Sets value to [rowId, rowId + count).
*/
public abstract void putDoubles(int rowId, int count, double value);

/**
* Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
* Sets values from [src[srcIndex], src[srcIndex + count]) to [rowId, rowId + count)
*/
public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);

/**
* Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
* The data in src must be ieee formatted doubles.
* Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, rowId + count)
* The data in src must be ieee formatted doubles in platform native endian.
*/
public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);

Expand All @@ -254,7 +272,7 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
public abstract void putArray(int rowId, int offset, int length);

/**
* Sets the value at rowId to `value`.
* Sets values from [value + offset, value + offset + count) to the values at rowId.
*/
public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
public final int putByteArray(int rowId, byte[] value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, UnsafeMapData, UnsafeRow}
import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor
import org.apache.spark.sql.execution.vectorized.WritableColumnVector
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -62,6 +63,9 @@ private[columnar] abstract class BasicColumnAccessor[JvmType](
}

protected def underlyingBuffer = buffer

def getByteBuffer: ByteBuffer =
buffer.duplicate.order(ByteOrder.nativeOrder())
}

private[columnar] class NullColumnAccessor(buffer: ByteBuffer)
Expand Down Expand Up @@ -122,7 +126,7 @@ private[columnar] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType)
extends BasicColumnAccessor[UnsafeMapData](buffer, MAP(dataType))
with NullableColumnAccessor

private[columnar] object ColumnAccessor {
private[sql] object ColumnAccessor {
@tailrec
def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
val buf = buffer.order(ByteOrder.nativeOrder)
Expand All @@ -149,4 +153,14 @@ private[columnar] object ColumnAccessor {
throw new Exception(s"not support type: $other")
}
}

def decompress(columnAccessor: ColumnAccessor, columnVector: WritableColumnVector, numRows: Int):
Unit = {
if (columnAccessor.isInstanceOf[NativeColumnAccessor[_]]) {
val nativeAccessor = columnAccessor.asInstanceOf[NativeColumnAccessor[_]]
nativeAccessor.decompress(columnVector, numRows)
} else {
throw new RuntimeException("Not support non-primitive type now")
}
}
}
Loading