Skip to content

Commit faadbd4

Browse files
JoshRosenyhuai
authored andcommitted
[SPARK-7858] [SQL] Use output schema, not relation schema, for data source input conversion
In `DataSourceStrategy.createPhysicalRDD`, we use the relation schema as the target schema for converting incoming rows into Catalyst rows. However, we should be using the output schema instead, since our scan might return a subset of the relation's columns. This patch incorporates #6414 by liancheng, which fixes an issue in `SimpleTestRelation` that prevented this bug from being caught by our old tests: > In `SimpleTextRelation`, we specified `needsConversion` to `true`, indicating that values produced by this testing relation should be of Scala types, and need to be converted to Catalyst types when necessary. However, we also used `Cast` to convert strings to expected data types. And `Cast` always produces values of Catalyst types, thus no conversion is done at all. This PR makes `SimpleTextRelation` produce Scala values so that data conversion code paths can be properly tested. Closes #5986. Author: Josh Rosen <[email protected]> Author: Cheng Lian <[email protected]> Author: Cheng Lian <[email protected]> Closes #6400 from JoshRosen/SPARK-7858 and squashes the following commits: e71c866 [Josh Rosen] Re-fix bug so that the tests pass again 56b13e5 [Josh Rosen] Add regression test to hadoopFsRelationSuites 2169a0f [Josh Rosen] Remove use of SpecificMutableRow and BufferedIterator 6cd7366 [Josh Rosen] Fix SPARK-7858 by using output types for conversion. 5a00e66 [Josh Rosen] Add assertions in order to reproduce SPARK-7858 8ba195c [Cheng Lian] Merge 9968fba into 6166473 9968fba [Cheng Lian] Tests the data type conversion code paths (cherry picked from commit 0c33c7b) Signed-off-by: Yin Huai <[email protected]>
1 parent 311fcf6 commit faadbd4

File tree

5 files changed

+37
-41
lines changed

5 files changed

+37
-41
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
392392
SparkPlan.currentContext.set(self)
393393
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
394394
val attributeSeq = schema.toAttributes
395-
val rowRDD = RDDConversions.productToRowRdd(rdd, schema)
395+
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
396396
DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
397397
}
398398

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -21,63 +21,49 @@ import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.sql.catalyst.CatalystTypeConverters
2323
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow, SpecificMutableRow}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
2525
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
26-
import org.apache.spark.sql.types.StructType
26+
import org.apache.spark.sql.types.DataType
2727
import org.apache.spark.sql.{Row, SQLContext}
2828

2929
/**
3030
* :: DeveloperApi ::
3131
*/
3232
@DeveloperApi
3333
object RDDConversions {
34-
def productToRowRdd[A <: Product](data: RDD[A], schema: StructType): RDD[Row] = {
34+
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[Row] = {
3535
data.mapPartitions { iterator =>
36-
if (iterator.isEmpty) {
37-
Iterator.empty
38-
} else {
39-
val bufferedIterator = iterator.buffered
40-
val mutableRow = new SpecificMutableRow(schema.fields.map(_.dataType))
41-
val schemaFields = schema.fields.toArray
42-
val converters = schemaFields.map {
43-
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
44-
}
45-
bufferedIterator.map { r =>
46-
var i = 0
47-
while (i < mutableRow.length) {
48-
mutableRow(i) = converters(i)(r.productElement(i))
49-
i += 1
50-
}
51-
52-
mutableRow
36+
val numColumns = outputTypes.length
37+
val mutableRow = new GenericMutableRow(numColumns)
38+
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
39+
iterator.map { r =>
40+
var i = 0
41+
while (i < numColumns) {
42+
mutableRow(i) = converters(i)(r.productElement(i))
43+
i += 1
5344
}
45+
46+
mutableRow
5447
}
5548
}
5649
}
5750

5851
/**
5952
* Convert the objects inside Row into the types Catalyst expected.
6053
*/
61-
def rowToRowRdd(data: RDD[Row], schema: StructType): RDD[Row] = {
54+
def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): RDD[Row] = {
6255
data.mapPartitions { iterator =>
63-
if (iterator.isEmpty) {
64-
Iterator.empty
65-
} else {
66-
val bufferedIterator = iterator.buffered
67-
val mutableRow = new GenericMutableRow(bufferedIterator.head.toSeq.toArray)
68-
val schemaFields = schema.fields.toArray
69-
val converters = schemaFields.map {
70-
f => CatalystTypeConverters.createToCatalystConverter(f.dataType)
71-
}
72-
bufferedIterator.map { r =>
73-
var i = 0
74-
while (i < mutableRow.length) {
75-
mutableRow(i) = converters(i)(r(i))
76-
i += 1
77-
}
78-
79-
mutableRow
56+
val numColumns = outputTypes.length
57+
val mutableRow = new GenericMutableRow(numColumns)
58+
val converters = outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
59+
iterator.map { r =>
60+
var i = 0
61+
while (i < numColumns) {
62+
mutableRow(i) = converters(i)(r(i))
63+
i += 1
8064
}
65+
66+
mutableRow
8167
}
8268
}
8369
}

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
309309
output: Seq[Attribute],
310310
rdd: RDD[Row]): SparkPlan = {
311311
val converted = if (relation.needConversion) {
312-
execution.RDDConversions.rowToRowRdd(rdd, relation.schema)
312+
execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
313313
} else {
314314
rdd
315315
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputForma
2727
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
2828

2929
import org.apache.spark.rdd.RDD
30+
import org.apache.spark.sql.catalyst.CatalystTypeConverters
3031
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
3132
import org.apache.spark.sql.types.{DataType, StructType}
3233
import org.apache.spark.sql.{Row, SQLContext}
@@ -108,7 +109,10 @@ class SimpleTextRelation(
108109

109110
sparkContext.textFile(inputStatuses.map(_.getPath).mkString(",")).map { record =>
110111
Row(record.split(",").zip(fields).map { case (value, dataType) =>
111-
Cast(Literal(value), dataType).eval()
112+
// `Cast`ed values are always of Catalyst types (i.e. UTF8String instead of String, etc.)
113+
val catalystValue = Cast(Literal(value), dataType).eval()
114+
// Here we're converting Catalyst values to Scala values to test `needsConversion`
115+
CatalystTypeConverters.convertToScala(catalystValue, dataType)
112116
}: _*)
113117
}
114118
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
7676
df.filter('a > 1 && 'p1 < 2).select('b, 'p1),
7777
for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1))
7878

79+
// Project many copies of columns with different types (reproduction for SPARK-7858)
80+
checkAnswer(
81+
df.filter('a > 1 && 'p1 < 2).select('b, 'b, 'b, 'b, 'p1, 'p1, 'p1, 'p1),
82+
for (i <- 2 to 3; _ <- Seq("foo", "bar"))
83+
yield Row(s"val_$i", s"val_$i", s"val_$i", s"val_$i", 1, 1, 1, 1))
84+
7985
// Self-join
8086
df.registerTempTable("t")
8187
withTempTable("t") {

0 commit comments

Comments
 (0)