Skip to content

Commit f378335

Browse files
committed
CR: made struct/array extend the catalyst types
1 parent 2918077 commit f378335

File tree

4 files changed

+221
-118
lines changed

4 files changed

+221
-118
lines changed

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java

Lines changed: 203 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,14 @@
1717
package org.apache.spark.sql.execution.vectorized;
1818

1919
import org.apache.spark.memory.MemoryMode;
20+
import org.apache.spark.sql.catalyst.InternalRow;
21+
import org.apache.spark.sql.catalyst.util.ArrayData;
22+
import org.apache.spark.sql.catalyst.util.MapData;
2023
import org.apache.spark.sql.types.*;
24+
import org.apache.spark.unsafe.types.CalendarInterval;
25+
import org.apache.spark.unsafe.types.UTF8String;
26+
27+
import org.apache.commons.lang.NotImplementedException;
2128

2229
/**
2330
* This class represents a column of values and provides the main APIs to access the data
@@ -61,13 +68,8 @@ public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode
6168
/**
6269
* Holder object to return an array. This object is intended to be reused. Callers should
6370
* copy the data out if it needs to be stored.
64-
* TODO: consider adding all the get APIs that are 0 indexed. This would just be a convenience
65-
* wrapper. Measure this for common usage patterns and see if there is an overhead.
66-
* int getInt(index) {
67-
* return data.getInt(index + offset);
68-
* }
6971
*/
70-
public static final class Array {
72+
public static final class Array extends ArrayData {
7173
// The data for this array. This array contains elements from
7274
// data[offset] to data[offset + length).
7375
public final ColumnVector data;
@@ -85,27 +87,220 @@ public static final class Array {
8587
protected Array(ColumnVector data) {
8688
this.data = data;
8789
}
90+
91+
@Override
92+
public final int numElements() { return length; }
93+
94+
@Override
95+
public ArrayData copy() {
96+
throw new NotImplementedException();
97+
}
98+
99+
// TODO: this is extremely expensive.
100+
@Override
101+
public Object[] array() {
102+
DataType dt = data.dataType();
103+
Object[] list = new Object[length];
104+
105+
if (dt instanceof ByteType) {
106+
for (int i = 0; i < length; i++) {
107+
if (!data.getIsNull(offset + i)) {
108+
list[i] = data.getByte(offset + i);
109+
}
110+
}
111+
} else if (dt instanceof IntegerType) {
112+
for (int i = 0; i < length; i++) {
113+
if (!data.getIsNull(offset + i)) {
114+
list[i] = data.getInt(offset + i);
115+
}
116+
}
117+
} else if (dt instanceof DoubleType) {
118+
for (int i = 0; i < length; i++) {
119+
if (!data.getIsNull(offset + i)) {
120+
list[i] = data.getDouble(offset + i);
121+
}
122+
}
123+
} else if (dt instanceof LongType) {
124+
for (int i = 0; i < length; i++) {
125+
if (!data.getIsNull(offset + i)) {
126+
list[i] = data.getLong(offset + i);
127+
}
128+
}
129+
} else if (dt instanceof StringType) {
130+
for (int i = 0; i < length; i++) {
131+
if (!data.getIsNull(offset + i)) {
132+
list[i] = ColumnVectorUtils.toString(data.getByteArray(offset + i));
133+
}
134+
}
135+
} else {
136+
throw new NotImplementedException("Type " + dt);
137+
}
138+
return list;
139+
}
140+
141+
@Override
142+
public final boolean isNullAt(int ordinal) { return data.getIsNull(offset + ordinal); }
143+
144+
@Override
145+
public final boolean getBoolean(int ordinal) {
146+
throw new NotImplementedException();
147+
}
148+
149+
@Override
150+
public byte getByte(int ordinal) { return data.getByte(offset + ordinal); }
151+
152+
@Override
153+
public short getShort(int ordinal) {
154+
throw new NotImplementedException();
155+
}
156+
157+
@Override
158+
public int getInt(int ordinal) { return data.getInt(offset + ordinal); }
159+
160+
@Override
161+
public long getLong(int ordinal) { return data.getLong(offset + ordinal); }
162+
163+
@Override
164+
public float getFloat(int ordinal) {
165+
throw new NotImplementedException();
166+
}
167+
168+
@Override
169+
public double getDouble(int ordinal) { return data.getDouble(offset + ordinal); }
170+
171+
@Override
172+
public Decimal getDecimal(int ordinal, int precision, int scale) {
173+
throw new NotImplementedException();
174+
}
175+
176+
@Override
177+
public UTF8String getUTF8String(int ordinal) {
178+
Array child = data.getByteArray(offset + ordinal);
179+
return UTF8String.fromBytes(child.byteArray, child.byteArrayOffset, child.length);
180+
}
181+
182+
@Override
183+
public byte[] getBinary(int ordinal) {
184+
throw new NotImplementedException();
185+
}
186+
187+
@Override
188+
public CalendarInterval getInterval(int ordinal) {
189+
throw new NotImplementedException();
190+
}
191+
192+
@Override
193+
public InternalRow getStruct(int ordinal, int numFields) {
194+
throw new NotImplementedException();
195+
}
196+
197+
@Override
198+
public ArrayData getArray(int ordinal) {
199+
return data.getArray(offset + ordinal);
200+
}
201+
202+
@Override
203+
public MapData getMap(int ordinal) {
204+
throw new NotImplementedException();
205+
}
206+
207+
@Override
208+
public Object get(int ordinal, DataType dataType) {
209+
throw new NotImplementedException();
210+
}
88211
}
89212

90213
/**
91214
* Holder object to return a struct. This object is intended to be reused.
92215
*/
93-
public static final class Struct {
216+
public static final class Struct extends InternalRow {
94217
// The fields that make up this struct. For example, if the struct had 2 int fields, the access
95218
// to it would be:
96219
// int f1 = fields[0].getInt[rowId]
97220
// int f2 = fields[1].getInt[rowId]
98221
public final ColumnVector[] fields;
99222

100-
public boolean getIsNull(int fieldIdx) { return fields[fieldIdx].getIsNull(rowId); }
223+
@Override
224+
public boolean isNullAt(int fieldIdx) { return fields[fieldIdx].getIsNull(rowId); }
225+
226+
@Override
227+
public boolean getBoolean(int ordinal) {
228+
throw new NotImplementedException();
229+
}
230+
101231
public byte getByte(int fieldIdx) { return fields[fieldIdx].getByte(rowId); }
232+
233+
@Override
234+
public short getShort(int ordinal) {
235+
throw new NotImplementedException();
236+
}
237+
102238
public int getInt(int fieldIdx) { return fields[fieldIdx].getInt(rowId); }
103239
public long getLong(int fieldIdx) { return fields[fieldIdx].getLong(rowId); }
240+
241+
@Override
242+
public float getFloat(int ordinal) {
243+
throw new NotImplementedException();
244+
}
245+
104246
public double getDouble(int fieldIdx) { return fields[fieldIdx].getDouble(rowId); }
247+
248+
@Override
249+
public Decimal getDecimal(int ordinal, int precision, int scale) {
250+
throw new NotImplementedException();
251+
}
252+
253+
@Override
254+
public UTF8String getUTF8String(int ordinal) {
255+
Array a = getByteArray(ordinal);
256+
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
257+
}
258+
259+
@Override
260+
public byte[] getBinary(int ordinal) {
261+
throw new NotImplementedException();
262+
}
263+
264+
@Override
265+
public CalendarInterval getInterval(int ordinal) {
266+
throw new NotImplementedException();
267+
}
268+
269+
@Override
270+
public InternalRow getStruct(int ordinal, int numFields) {
271+
return fields[ordinal].getStruct(rowId);
272+
}
273+
105274
public Array getArray(int fieldIdx) { return fields[fieldIdx].getArray(rowId); }
275+
276+
@Override
277+
public MapData getMap(int ordinal) {
278+
throw new NotImplementedException();
279+
}
280+
281+
@Override
282+
public Object get(int ordinal, DataType dataType) {
283+
throw new NotImplementedException();
284+
}
285+
106286
public Array getByteArray(int fieldIdx) { return fields[fieldIdx].getByteArray(rowId); }
107287
public Struct getStruct(int fieldIdx) { return fields[fieldIdx].getStruct(rowId); }
108288

289+
@Override
290+
public final int numFields() {
291+
return fields.length;
292+
}
293+
294+
@Override
295+
public InternalRow copy() {
296+
throw new NotImplementedException();
297+
}
298+
299+
@Override
300+
public boolean anyNull() {
301+
throw new NotImplementedException();
302+
}
303+
109304
protected int rowId;
110305

111306
protected Struct(ColumnVector[] fields) {

sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java

Lines changed: 0 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,12 @@
1616
*/
1717
package org.apache.spark.sql.execution.vectorized;
1818

19-
import java.util.ArrayList;
2019
import java.util.Iterator;
2120
import java.util.List;
2221

2322
import org.apache.spark.memory.MemoryMode;
2423
import org.apache.spark.sql.Row;
25-
import org.apache.spark.sql.catalyst.InternalRow;
26-
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
27-
import org.apache.spark.sql.catalyst.expressions.GenericRow;
28-
import org.apache.spark.sql.catalyst.expressions.StringTranslate;
29-
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
30-
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
31-
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
32-
import org.apache.spark.sql.catalyst.util.GenericArrayData;
3324
import org.apache.spark.sql.types.*;
34-
import org.apache.spark.unsafe.Platform;
35-
import org.apache.spark.unsafe.types.UTF8String;
3625

3726
import org.apache.commons.lang.NotImplementedException;
3827

@@ -68,93 +57,6 @@ public static Object toPrimitiveJavaArray(ColumnVector.Array array) {
6857
}
6958
}
7059

71-
public static GenericMutableRow toRow(ColumnVector.Struct struct) {
72-
GenericMutableRow row = new GenericMutableRow(struct.fields.length);
73-
74-
for (int i = 0; i < struct.fields.length; i++) {
75-
if (struct.getIsNull(i)) {
76-
row.setNullAt(i);
77-
} else {
78-
DataType dt = struct.fields[i].dataType();
79-
if (dt instanceof ByteType) {
80-
row.setByte(i, struct.getByte(i));
81-
} else if (dt instanceof IntegerType) {
82-
row.setInt(i, struct.getInt(i));
83-
} else if (dt instanceof LongType) {
84-
row.setLong(i, struct.getLong(i));
85-
} else if (dt instanceof DoubleType) {
86-
row.setDouble(i, struct.getDouble(i));
87-
} else if (dt instanceof StringType) {
88-
ColumnVector.Array a = struct.getByteArray(i);
89-
row.update(i, UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length));
90-
} else if (dt instanceof StructType) {
91-
GenericMutableRow child = toRow(struct.getStruct(i));
92-
row.update(i, child);
93-
} else if (dt instanceof ArrayType) {
94-
row.update(i, toGenericArray(struct.getArray(i)));
95-
} else {
96-
throw new RuntimeException("Not implemented. " + dt);
97-
}
98-
}
99-
}
100-
101-
return row;
102-
}
103-
104-
/**
105-
* Converts an ColumnVector array into a GenericArrayData. This is very expensive to do.
106-
*/
107-
public static GenericArrayData toGenericArray(ColumnVector.Array array) {
108-
DataType dt = array.data.dataType();
109-
List<Object> list = new ArrayList<Object>(array.length);
110-
ColumnVector data = array.data;
111-
112-
if (dt instanceof ByteType) {
113-
for (int i = 0; i < array.length; i++) {
114-
if (data.getIsNull(array.offset + i)) {
115-
list.add(null);
116-
} else {
117-
list.add(data.getByte(array.offset + i));
118-
}
119-
}
120-
} else if (dt instanceof IntegerType) {
121-
for (int i = 0; i < array.length; i++) {
122-
if (data.getIsNull(array.offset + i)) {
123-
list.add(null);
124-
} else {
125-
list.add(data.getInt(array.offset + i));
126-
}
127-
}
128-
} else if (dt instanceof DoubleType) {
129-
for (int i = 0; i < array.length; i++) {
130-
if (data.getIsNull(array.offset + i)) {
131-
list.add(null);
132-
} else {
133-
list.add(data.getDouble(array.offset + i));
134-
}
135-
}
136-
} else if (dt instanceof LongType) {
137-
for (int i = 0; i < array.length; i++) {
138-
if (data.getIsNull(array.offset + i)) {
139-
list.add(null);
140-
} else {
141-
list.add(data.getLong(array.offset + i));
142-
}
143-
}
144-
} else if (dt instanceof StringType) {
145-
for (int i = 0; i < array.length; i++) {
146-
if (data.getIsNull(array.offset + i)) {
147-
list.add(null);
148-
} else {
149-
list.add(toString(data.getByteArray(array.offset + i)));
150-
}
151-
}
152-
} else {
153-
throw new NotImplementedException("Type " + dt);
154-
}
155-
return new GenericArrayData(list);
156-
}
157-
15860
private static void appendValue(ColumnVector dst, DataType t, Object o) {
15961
if (o == null) {
16062
dst.appendNull();

0 commit comments

Comments
 (0)