Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@

package org.apache.flink.orc.vector;

import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

Expand Down Expand Up @@ -151,8 +160,118 @@ private static void setColumn(
vector.set(rowId, timestamp);
break;
}
case ARRAY:
{
ListColumnVector listColumnVector = (ListColumnVector) column;
setColumn(rowId, listColumnVector, type, row, columnId);
break;
}
case MAP:
{
MapColumnVector mapColumnVector = (MapColumnVector) column;
setColumn(rowId, mapColumnVector, type, row, columnId);
break;
}
case ROW:
{
StructColumnVector structColumnVector = (StructColumnVector) column;
setColumn(rowId, structColumnVector, type, row, columnId);
break;
}
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}

private static void setColumn(
int rowId,
ListColumnVector listColumnVector,
LogicalType type,
RowData row,
int columnId) {
ArrayData arrayData = row.getArray(columnId);
ArrayType arrayType = (ArrayType) type;
listColumnVector.lengths[rowId] = arrayData.size();
listColumnVector.offsets[rowId] = listColumnVector.childCount;
listColumnVector.childCount += listColumnVector.lengths[rowId];
listColumnVector.child.ensureSize(
listColumnVector.childCount, listColumnVector.offsets[rowId] != 0);

RowData convertedRowData = convert(arrayData, arrayType.getElementType());
for (int i = 0; i < arrayData.size(); i++) {
setColumn(
(int) listColumnVector.offsets[rowId] + i,
listColumnVector.child,
arrayType.getElementType(),
convertedRowData,
i);
}
}

private static void setColumn(
int rowId,
MapColumnVector mapColumnVector,
LogicalType type,
RowData row,
int columnId) {
MapData mapData = row.getMap(columnId);
MapType mapType = (MapType) type;
ArrayData keyArray = mapData.keyArray();
ArrayData valueArray = mapData.valueArray();
mapColumnVector.lengths[rowId] = mapData.size();
mapColumnVector.offsets[rowId] = mapColumnVector.childCount;
mapColumnVector.childCount += mapColumnVector.lengths[rowId];
mapColumnVector.keys.ensureSize(
mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);
mapColumnVector.values.ensureSize(
mapColumnVector.childCount, mapColumnVector.offsets[rowId] != 0);

RowData convertedKeyRowData = convert(keyArray, mapType.getKeyType());
RowData convertedValueRowData = convert(valueArray, mapType.getValueType());
for (int i = 0; i < keyArray.size(); i++) {
setColumn(
(int) mapColumnVector.offsets[rowId] + i,
mapColumnVector.keys,
mapType.getKeyType(),
convertedKeyRowData,
i);
setColumn(
(int) mapColumnVector.offsets[rowId] + i,
mapColumnVector.values,
mapType.getValueType(),
convertedValueRowData,
i);
}
}

private static void setColumn(
int rowId,
StructColumnVector structColumnVector,
LogicalType type,
RowData row,
int columnId) {
RowData structRow = row.getRow(columnId, structColumnVector.fields.length);
RowType rowType = (RowType) type;
for (int i = 0; i < structRow.getArity(); i++) {
ColumnVector cv = structColumnVector.fields[i];
setColumn(rowId, cv, rowType.getTypeAt(i), structRow, i);
}
}

/**
* Converting ArrayData to RowData for calling {@link RowDataVectorizer#setColumn(int,
* ColumnVector, LogicalType, RowData, int)} recursively with array.
*
* @param arrayData input ArrayData.
* @param arrayFieldType LogicalType of input ArrayData.
* @return RowData.
*/
private static RowData convert(ArrayData arrayData, LogicalType arrayFieldType) {
GenericRowData rowData = new GenericRowData(arrayData.size());
ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(arrayFieldType);
for (int i = 0; i < arrayData.size(); i++) {
rowData.setField(i, elementGetter.getElementOrNull(arrayData, i));
}
return rowData;
}
}
Loading