@@ -23,69 +23,58 @@ import org.apache.spark.sql.QueryTest
2323import org .apache .spark .sql .catalyst .InternalRow
2424import org .apache .spark .sql .execution .vectorized .{OnHeapColumnVector , WritableColumnVector }
2525import org .apache .spark .sql .test .{SharedSQLContext , SQLTestUtils }
26- import org .apache .spark .sql .types .StructType
26+ import org .apache .spark .sql .types .{ StructField , StructType }
2727import org .apache .spark .unsafe .types .UTF8String .fromString
2828
2929class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext {
30-
31- private val orcFileSchema = TypeDescription .fromString(s " struct<col1:int,col2:int> " )
32- private val requiredSchema = StructType .fromDDL(" col1 int, col3 int" )
33- private val partitionSchema = StructType .fromDDL(" partCol1 string, partCol2 string" )
30+ private val dataSchema = StructType .fromDDL(" col1 int, col2 int" )
31+ private val partitionSchema = StructType .fromDDL(" p1 string, p2 string" )
3432 private val partitionValues = InternalRow (fromString(" partValue1" ), fromString(" partValue2" ))
35- private val resultSchema = StructType (requiredSchema.fields ++ partitionSchema.fields)
36-
37- private val isConstant = classOf [WritableColumnVector ].getDeclaredField(" isConstant" )
38- isConstant.setAccessible(true )
33+ private val orcFileSchemaList = Seq (
34+ " struct<col1:int,col2:int>" , " struct<col1:int,col2:int,p1:string,p2:string>" ,
35+ " struct<col1:int,col2:int,p1:string>" , " struct<col1:int,col2:int,p2:string>" )
36+ orcFileSchemaList.foreach { case schema =>
37+ val orcFileSchema = TypeDescription .fromString(schema)
3938
40- private def getReader (requestedDataColIds : Array [Int ], requestedPartitionColIds : Array [Int ]) = {
41- val reader = new OrcColumnarBatchReader (false , false , 4096 )
42- reader.initBatch(
43- orcFileSchema,
44- resultSchema.fields,
45- requestedDataColIds,
46- requestedPartitionColIds,
47- partitionValues)
48- reader
49- }
50-
51- test(" requestedPartitionColIds resets requestedDataColIds - all partitions are requested" ) {
52- val requestedDataColIds = Array (0 , 1 , 0 , 0 )
53- val requestedPartitionColIds = Array (- 1 , - 1 , 0 , 1 )
54- val reader = getReader(requestedDataColIds, requestedPartitionColIds)
55- assert(reader.requestedDataColIds === Array (0 , 1 , - 1 , - 1 ))
56- }
39+ val isConstant = classOf [WritableColumnVector ].getDeclaredField(" isConstant" )
40+ isConstant.setAccessible(true )
5741
58- test(" requestedPartitionColIds resets requestedDataColIds - one partition is requested" ) {
59- Seq ((Array (- 1 , - 1 , 0 , - 1 ), Array (0 , 1 , - 1 , 0 )),
60- (Array (- 1 , - 1 , - 1 , 0 ), Array (0 , 1 , 0 , - 1 ))).foreach {
61- case (requestedPartitionColIds, answer) =>
62- val requestedDataColIds = Array (0 , 1 , 0 , 0 )
63- val reader = getReader(requestedDataColIds, requestedPartitionColIds)
64- assert(reader.requestedDataColIds === answer)
42+ def getReader (
43+ requestedDataColIds : Array [Int ],
44+ requestedPartitionColIds : Array [Int ],
45+ resultFields : Array [StructField ]): OrcColumnarBatchReader = {
46+ val reader = new OrcColumnarBatchReader (false , false , 4096 )
47+ reader.initBatch(
48+ orcFileSchema,
49+ resultFields,
50+ requestedDataColIds,
51+ requestedPartitionColIds,
52+ partitionValues)
53+ reader
6554 }
66- }
67-
68- test(" initBatch should initialize requested partition columns only" ) {
69- val requestedDataColIds = Array (0 , - 1 , - 1 , - 1 ) // only `col1` is requested, `col3` doesn't exist
70- val requestedPartitionColIds = Array (- 1 , - 1 , 0 , - 1 ) // only `partCol1` is requested
71- val reader = getReader(requestedDataColIds, requestedPartitionColIds)
72- val batch = reader.columnarBatch
73- assert(batch.numCols() === 4 )
7455
75- assert(batch.column(0 ).isInstanceOf [OrcColumnVector ])
76- assert(batch.column(1 ).isInstanceOf [OnHeapColumnVector ])
77- assert(batch.column(2 ).isInstanceOf [OnHeapColumnVector ])
78- assert(batch.column(3 ).isInstanceOf [OnHeapColumnVector ])
56+ test(s " all partitions are requested: $schema" ) {
57+ val requestedDataColIds = Array (0 , 1 , 0 , 0 )
58+ val requestedPartitionColIds = Array (- 1 , - 1 , 0 , 1 )
59+ val reader = getReader(requestedDataColIds, requestedPartitionColIds,
60+ dataSchema.fields ++ partitionSchema.fields)
61+ assert(reader.requestedDataColIds === Array (0 , 1 , - 1 , - 1 ))
62+ }
7963
80- val col3 = batch.column(1 ).asInstanceOf [OnHeapColumnVector ]
81- val partCol1 = batch.column(2 ).asInstanceOf [OnHeapColumnVector ]
82- val partCol2 = batch.column(3 ).asInstanceOf [OnHeapColumnVector ]
64+ test(s " initBatch should initialize requested partition columns only: $schema" ) {
65+ val requestedDataColIds = Array (0 , - 1 ) // only `col1` is requested, `col2` doesn't exist
66+ val requestedPartitionColIds = Array (- 1 , 0 ) // only `p1` is requested
67+ val reader = getReader(requestedDataColIds, requestedPartitionColIds,
68+ Array (dataSchema.fields(0 ), partitionSchema.fields(0 )))
69+ val batch = reader.columnarBatch
70+ assert(batch.numCols() === 2 )
8371
84- assert(isConstant.get(col3).asInstanceOf [Boolean ]) // `col3` is NULL.
85- assert(isConstant.get(partCol1).asInstanceOf [Boolean ]) // Partition column is constant.
86- assert(isConstant.get(partCol2).asInstanceOf [Boolean ]) // Null column is constant.
72+ assert(batch.column(0 ).isInstanceOf [OrcColumnVector ])
73+ assert(batch.column(1 ).isInstanceOf [OnHeapColumnVector ])
8774
88- assert(partCol1.getUTF8String(0 ) === partitionValues.getUTF8String(0 ))
89- assert(partCol2.isNullAt(0 )) // This is NULL because it's not requested.
75+ val p1 = batch.column(1 ).asInstanceOf [OnHeapColumnVector ]
76+ assert(isConstant.get(p1).asInstanceOf [Boolean ]) // Partition column is constant.
77+ assert(p1.getUTF8String(0 ) === partitionValues.getUTF8String(0 ))
78+ }
9079 }
9180}
0 commit comments